diff --git a/src/apex/apex.cpp b/src/apex/apex.cpp index be2f449b..eacd1f72 100644 --- a/src/apex/apex.cpp +++ b/src/apex/apex.cpp @@ -714,6 +714,17 @@ inline std::shared_ptr _new_task( return tt_ptr; } +/* This function helps because it's possible that the user could call the + * API from a new thread without registering the thread. So, make sure it + * got registered. + */ +bool register_thread_helper(void) { + if (!_registered) { + register_thread("Worker Thread", nullptr); + } + return true; +} + profiler* start(const std::string &timer_name) { in_apex prevent_deadlocks; @@ -745,6 +756,8 @@ profiler* start(const std::string &timer_name) APEX_UTIL_REF_COUNT_SUSPENDED_START return profiler::get_disabled_profiler(); } + // make sure APEX knows about this worker thread! + [[maybe_unused]] thread_local static bool _helper = register_thread_helper(); std::shared_ptr tt_ptr(nullptr); profiler * new_profiler = nullptr; if (_notify_listeners) { @@ -812,6 +825,8 @@ profiler* start(const apex_function_address function_address) { APEX_UTIL_REF_COUNT_SUSPENDED_START return profiler::get_disabled_profiler(); } + // make sure APEX knows about this worker thread! + [[maybe_unused]] thread_local static bool _helper = register_thread_helper(); std::shared_ptr tt_ptr(nullptr); profiler * new_profiler = nullptr; if (_notify_listeners) { @@ -885,6 +900,8 @@ void start(std::shared_ptr tt_ptr) { tt_ptr->prof = profiler::get_disabled_profiler(); return; } + // make sure APEX knows about this worker thread! + [[maybe_unused]] thread_local static bool _helper = register_thread_helper(); // get the thread id that is running this task if (tt_ptr->thread_id != thread_instance::instance().get_id()) { printf("Task %s created by %lu started by %lu\n", tt_ptr->task_id->get_name().c_str(), @@ -943,6 +960,8 @@ void resume(std::shared_ptr tt_ptr) { APEX_UTIL_REF_COUNT_RESUME_AFTER_FINALIZE return; } + // make sure APEX knows about this worker thread! + [[maybe_unused]] thread_local static bool _helper = register_thread_helper(); //APEX_ASSERT(tt_ptr->state == task_wrapper::YIELDED); tt_ptr->state = task_wrapper::RUNNING; if (_notify_listeners) { @@ -1054,6 +1073,8 @@ profiler* resume(profiler * p) { APEX_UTIL_REF_COUNT_RESUME_AFTER_FINALIZE return nullptr; } + // make sure APEX knows about this worker thread! + [[maybe_unused]] thread_local static bool _helper = register_thread_helper(); p->restart(); APEX_ASSERT(p->tt_ptr->state == task_wrapper::STOPPED); p->tt_ptr->state = task_wrapper::RUNNING; @@ -1150,6 +1171,8 @@ void apex::stop_internal(profiler* the_profiler) { APEX_UTIL_REF_COUNT_STOP_AFTER_FINALIZE return; } + // make sure APEX knows about this worker thread! + [[maybe_unused]] thread_local static bool _helper = register_thread_helper(); std::shared_ptr p{the_profiler}; APEX_ASSERT(p->tt_ptr->state == task_wrapper::RUNNING); p->tt_ptr->state = task_wrapper::STOPPED; @@ -1213,6 +1236,8 @@ void stop(profiler* the_profiler, bool cleanup) { APEX_UTIL_REF_COUNT_STOP_AFTER_FINALIZE return; } + // make sure APEX knows about this worker thread! + [[maybe_unused]] thread_local static bool _helper = register_thread_helper(); std::shared_ptr p{the_profiler}; APEX_ASSERT(p->tt_ptr->state == task_wrapper::RUNNING); p->tt_ptr->state = task_wrapper::STOPPED; @@ -1298,6 +1323,8 @@ void stop(std::shared_ptr tt_ptr) { if (tt_ptr->state == task_wrapper::YIELDED) { resume(tt_ptr); } + // make sure APEX knows about this worker thread! + [[maybe_unused]] thread_local static bool _helper = register_thread_helper(); APEX_ASSERT(tt_ptr->state == task_wrapper::RUNNING); tt_ptr->state = task_wrapper::STOPPED; if (tt_ptr->prof != nullptr) { @@ -1362,6 +1389,8 @@ void yield(profiler* the_profiler) { thread_instance::instance().clear_current_profiler(the_profiler, false, null_task_wrapper); } + // make sure APEX knows about this worker thread! + [[maybe_unused]] thread_local static bool _helper = register_thread_helper(); std::shared_ptr p{the_profiler}; APEX_ASSERT(p->tt_ptr->state == task_wrapper::RUNNING); p->tt_ptr->state = task_wrapper::YIELDED; @@ -1418,6 +1447,8 @@ void yield(std::shared_ptr tt_ptr) { thread_instance::instance().clear_current_profiler(tt_ptr->prof, true, tt_ptr); } + // make sure APEX knows about this worker thread! + [[maybe_unused]] thread_local static bool _helper = register_thread_helper(); std::shared_ptr p{tt_ptr->prof}; APEX_ASSERT(tt_ptr->state == task_wrapper::RUNNING); tt_ptr->state = task_wrapper::YIELDED; @@ -1454,6 +1485,8 @@ void sample_value(const std::string &name, double value, bool threaded) if (apex_options::suspend() == true) { return; } apex* instance = apex::instance(); // get the Apex static instance if (!instance) return; // protect against calls after finalization + // make sure APEX knows about this worker thread! + [[maybe_unused]] thread_local static bool _helper = register_thread_helper(); // parse the counter name // either /threadqueue{locality#0/total}/length // or /threadqueue{locality#0/worker-thread#0}/length @@ -2459,6 +2492,8 @@ void send (uint64_t tag, uint64_t size, uint64_t target) { apex* instance = apex::instance(); // protect against calls after finalization if (!instance || _exited) { return ; } + // make sure APEX knows about this worker thread! + [[maybe_unused]] thread_local static bool _helper = register_thread_helper(); if (_notify_listeners) { // eventually, we want to use the thread id, but for now, just use 0. @@ -2488,6 +2523,8 @@ void recv (uint64_t tag, uint64_t size, uint64_t source_rank, uint64_t apex* instance = apex::instance(); // protect against calls after finalization if (!instance || _exited) { return ; } + // make sure APEX knows about this worker thread! + [[maybe_unused]] thread_local static bool _helper = register_thread_helper(); if (_notify_listeners) { // eventually, we want to use the thread id, but for now, just use 0. diff --git a/src/apex/profiler_listener.cpp b/src/apex/profiler_listener.cpp index ac556687..1eab0541 100644 --- a/src/apex/profiler_listener.cpp +++ b/src/apex/profiler_listener.cpp @@ -1903,6 +1903,7 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl // get the right task identifier, based on whether there are aliases profiler * p = new profiler(tt_ptr, is_resume); p->thread_id = _pls.my_tid; + APEX_ASSERT(p->thread_id == (unsigned int)thread_instance::get_id()); p->guid = tt_ptr->guid; thread_instance::instance().set_current_profiler(p); #if APEX_HAVE_PAPI diff --git a/src/apex/taskstubs_implementation.cpp b/src/apex/taskstubs_implementation.cpp index b0d25a97..989e8c9f 100644 --- a/src/apex/taskstubs_implementation.cpp +++ b/src/apex/taskstubs_implementation.cpp @@ -13,6 +13,8 @@ #include #include #include +#include + using maptype = std::unordered_map>; @@ -32,10 +34,22 @@ maptype& getMyMap(void) { return theMap; } -void safePrint(const char * format, tasktimer_guid_t guid) { +int verbosePrint(int priority, const char *format, ...) +{ static std::mutex local_mtx; std::scoped_lock lock{local_mtx}; - printf("%lu %s GUID %lu\n", apex::thread_instance::get_id(), format, guid); + va_list args; + va_start(args, format); + vprintf(format, args); + va_end(args); +} + +#define VERBOSE_PRINTF(...) if (apex::apex_options::use_verbose()) { printf(__VA_ARGS__); } + +void safePrint(const char * format, tasktimer_guid_t guid, const char * event) { + //static std::mutex local_mtx; + //std::scoped_lock lock{local_mtx}; + VERBOSE_PRINTF("%lu TS: %s: GUID %lu %s\n", apex::thread_instance::get_id(), format, guid, event); return; } @@ -56,7 +70,7 @@ void safeInsert( } std::shared_ptr safeLookup( - tasktimer_guid_t guid) { + tasktimer_guid_t guid, const char * event) { #if 0 // in the thread local map? auto task = getMyMap().find(guid); @@ -77,7 +91,7 @@ std::shared_ptr safeLookup( std::scoped_lock lock{mtx()}; auto task = getCommonMap().find(guid); if (task == getCommonMap().end()) { - safePrint("Not found", guid); + safePrint("Not found", guid, event); //APEX_ASSERT(false); return nullptr; } @@ -106,7 +120,7 @@ extern "C" { } void tasktimer_finalize_impl(void) { /* Debatable whether we want to do this finalize */ - apex::finalize(); + //apex::finalize(); } // measurement function declarations tasktimer_timer_t tasktimer_create_impl( @@ -117,16 +131,18 @@ extern "C" { const uint64_t parent_count) { static bool& over = apex::get_program_over(); if (over) return nullptr; + safePrint("Creating", timer_guid, timer_name); + // need to look up the parent shared pointers? std::vector> parent_tasks; for (uint64_t i = 0 ; i < parent_count ; i++) { - auto tmp = safeLookup(parent_guids[i]); + auto tmp = safeLookup(parent_guids[i], "parent lookup"); if (tmp != nullptr) parent_tasks.push_back(tmp); } // if no name, use address if (timer_name == nullptr || strlen(timer_name) == 0) { - //printf("Null name for timer: %p\n", function_address); + //VERBOSE_PRINTF("Null name for timer: %p\n", function_address); if (parent_count > 0) { auto task = apex::new_task( (apex_function_address)function_address, @@ -153,10 +169,20 @@ extern "C" { } return (tasktimer_timer_t)(timer_guid); } + +#define MAP_TASK(_timer, _apex_timer, _event) \ + static bool& over_{apex::get_program_over()}; \ + if (over_) return; \ + uint64_t _tmp = (uint64_t)(_timer); \ + auto _apex_timer = safeLookup(_tmp, _event); + void tasktimer_schedule_impl( tasktimer_timer_t timer, tasktimer_argument_value_p arguments, uint64_t argument_count) { + MAP_TASK(timer, apex_timer, "schedule"); + VERBOSE_PRINTF("%lu TS: Scheduling: %lu %s\n", apex::thread_instance::get_id(), timer, + apex_timer == nullptr ? "unknown" : apex_timer->task_id->get_name().c_str()); static bool& over = apex::get_program_over(); if (over) return; // TODO: handle the schedule event, somehow @@ -165,18 +191,13 @@ extern "C" { APEX_UNUSED(argument_count); } -#define MAP_TASK(_timer, _apex_timer) \ - static bool& over_{apex::get_program_over()}; \ - if (over_) return; \ - uint64_t _tmp = (uint64_t)(_timer); \ - auto _apex_timer = safeLookup(_tmp); - void tasktimer_start_impl( tasktimer_timer_t timer, tasktimer_execution_space_p) { // TODO: capture the execution space, somehow...a new task? - MAP_TASK(timer, apex_timer); - //printf("%lu TS: Starting: %lu\n", apex::thread_instance::get_id(), timer); + MAP_TASK(timer, apex_timer, "start"); + VERBOSE_PRINTF("%lu TS: Starting: %lu %s\n", apex::thread_instance::get_id(), timer, + apex_timer == nullptr ? "unknown" : apex_timer->task_id->get_name().c_str()); if (apex_timer != nullptr) { apex::start(apex_timer); } @@ -186,8 +207,9 @@ extern "C" { #if 1 static bool& over = apex::get_program_over(); if (over) return; - MAP_TASK(timer, apex_timer); - //printf("%lu TS: Yielding: %lu\n", apex::thread_instance::get_id(), timer); + MAP_TASK(timer, apex_timer, "yield"); + VERBOSE_PRINTF("%lu TS: Yielding: %lu %s\n", apex::thread_instance::get_id(), timer, + apex_timer == nullptr ? "unknown" : apex_timer->task_id->get_name().c_str()); if (apex_timer != nullptr) { apex::yield(apex_timer); } @@ -198,8 +220,9 @@ extern "C" { tasktimer_execution_space_p) { #if 1 // TODO: capture the execution space, somehow...a new task? - MAP_TASK(timer, apex_timer); - //printf("%lu TS: Resuming: %lu\n", apex::thread_instance::get_id(), timer); + MAP_TASK(timer, apex_timer, "resume"); + VERBOSE_PRINTF("%lu TS: Resuming: %lu %s\n", apex::thread_instance::get_id(), timer, + apex_timer == nullptr ? "unknown" : apex_timer->task_id->get_name().c_str()); // TODO: why no resume function for task_wrapper objects? if (apex_timer != nullptr) { apex::resume(apex_timer); @@ -210,15 +233,28 @@ extern "C" { } void tasktimer_stop_impl( tasktimer_timer_t timer) { - MAP_TASK(timer, apex_timer); - //printf("%lu TS: Stopping: %lu\n", apex::thread_instance::get_id(), timer); + static std::set stopped; + static std::mutex local_mtx; + MAP_TASK(timer, apex_timer, "stop"); + VERBOSE_PRINTF("%lu TS: Stopping: %lu %s\n", apex::thread_instance::get_id(), timer, + apex_timer == nullptr ? "unknown" : apex_timer->task_id->get_name().c_str()); if (apex_timer != nullptr) { + { + std::scoped_lock lock{local_mtx}; + if (stopped.count(timer) > 0) { + VERBOSE_PRINTF("%lu TS: ERROR! TIMER STOPPED TWICE! : %lu\n", apex::thread_instance::get_id(), timer); + return; + } + stopped.insert(timer); + } apex::stop(apex_timer); } } void tasktimer_destroy_impl( tasktimer_timer_t timer) { - MAP_TASK(timer, apex_timer); + MAP_TASK(timer, apex_timer, "destroy"); + VERBOSE_PRINTF("%lu TS: Destroying: %lu %s\n", apex::thread_instance::get_id(), timer, + apex_timer == nullptr ? "unknown" : apex_timer->task_id->get_name().c_str()); if (apex_timer != nullptr) { // TODO: need to handle the destroy event somehow. // definitely need to remove it from the local map. @@ -229,10 +265,12 @@ extern "C" { tasktimer_timer_t timer, const tasktimer_guid_t* parents, const uint64_t parent_count) { // TODO: need to handle the add parents event - MAP_TASK(timer, apex_timer); + MAP_TASK(timer, apex_timer, "add parents"); + VERBOSE_PRINTF("%lu TS: Adding parents: %lu %s\n", apex::thread_instance::get_id(), timer, + apex_timer == nullptr ? "unknown" : apex_timer->task_id->get_name().c_str()); if (apex_timer != nullptr) { for (uint64_t i = 0 ; i < parent_count ; i++) { - auto tmp = safeLookup(parents[i]); + auto tmp = safeLookup(parents[i], "parent lookup"); if (tmp != nullptr) { // add the parent to the child apex_timer->parents.push_back(tmp); @@ -249,10 +287,12 @@ extern "C" { tasktimer_timer_t timer, const tasktimer_guid_t* children, const uint64_t child_count) { // TODO: need to handle the add children event - MAP_TASK(timer, apex_timer); + MAP_TASK(timer, apex_timer, "add children"); + VERBOSE_PRINTF("%lu TS: Adding children: %lu %s\n", apex::thread_instance::get_id(), timer, + apex_timer == nullptr ? "unknown" : apex_timer->task_id->get_name().c_str()); if (apex_timer != nullptr) { for (uint64_t i = 0 ; i < child_count ; i++) { - auto tmp = safeLookup(children[i]); + auto tmp = safeLookup(children[i], "child lookup"); if (tmp != nullptr) { // add the parent to the child tmp->parents.push_back(apex_timer); @@ -288,7 +328,7 @@ extern "C" { tasktimer_execution_space_p dest_type, const char* dest_name, const void* dest_ptr) { - std::shared_ptr parent = safeLookup(guid); + std::shared_ptr parent = safeLookup(guid, "data transfer"); auto task = apex::new_task("data xfer", 0, parent); timerStack(task, true); } diff --git a/src/apex/thread_instance.cpp b/src/apex/thread_instance.cpp index 183ee7ef..a81fc8ca 100644 --- a/src/apex/thread_instance.cpp +++ b/src/apex/thread_instance.cpp @@ -341,15 +341,17 @@ void thread_instance::clear_current_profiler_untied(profiler * the_profiler, // since we aren't yielding, just stop the children. stop(tmp->prof); // we better be re-entrant safe! } - // this is a serious problem... + // this is a serious problem...or is it? no! if (tmp->prof->untied_parent == nullptr) { + /* // unless...we happen to be exiting. Bets are off. if (apex_options::suspend() == true) { return; } // if we've already cleared the stack on this thread, we're fine if (instance()._exiting) { return; } - std::cerr << "Warning! empty profiler stack!\n"; + std::cerr << "Warning! empty profiler stack!" << __LINE__ << "\n"; APEX_ASSERT(false); //abort(); + */ return; } // get the new top of the stack @@ -423,7 +425,7 @@ void thread_instance::clear_current_profiler(profiler * the_profiler, if (apex_options::suspend() == true) { return; } // if we've already cleared the stack on this thread, we're fine if (instance()._exiting) { return; } - std::cerr << "Warning! empty profiler stack!\n"; + std::cerr << "Warning! empty profiler stack!" << __LINE__ << "\n"; APEX_ASSERT(false); //abort(); return; diff --git a/src/apex/trace_event_listener.cpp b/src/apex/trace_event_listener.cpp index 034faf82..f49e3fcd 100644 --- a/src/apex/trace_event_listener.cpp +++ b/src/apex/trace_event_listener.cpp @@ -125,6 +125,7 @@ inline std::string parents_to_string(std::shared_ptr tt_ptr) { inline void trace_event_listener::_common_start(std::shared_ptr &tt_ptr) { static APEX_NATIVE_TLS long unsigned int tid = get_thread_id_metadata(); + long unsigned int _tid = tt_ptr->prof->thread_id; if (!_terminate) { std::stringstream ss; ss.precision(3); @@ -236,9 +237,9 @@ inline void trace_event_listener::_common_stop(std::shared_ptr &p) { //std::cout << "FLOWING!" << std::endl; uint64_t flow_id = reversed_node_id + get_flow_id(); write_flow_event(ss, parent->get_flow_us()+0.25, 's', "ControlFlow", flow_id, - saved_node_id, parent->thread_id, parent->task_id->get_name(), p->get_task_id()->get_name()); + saved_node_id, parent->thread_id, "", ""); //parent->task_id->get_name(), p->get_task_id()->get_name()); write_flow_event(ss, p->get_start_us()-0.25, 'f', "ControlFlow", flow_id, - saved_node_id, _tid, parent->task_id->get_name(), p->get_task_id()->get_name()); + saved_node_id, _tid, "", ""); //parent->task_id->get_name(), p->get_task_id()->get_name()); } } if (p->tt_ptr->explicit_trace_start) {