diff --git a/CMakeLists.txt b/CMakeLists.txt index 99c12d39..4372c999 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -90,7 +90,7 @@ IF("${CMAKE_BUILD_TYPE}" STREQUAL "Debug") endif(NOT DEFINED APEX_BUILD_EXAMPLES) add_definitions(-DDEBUG) add_definitions(-DCMAKE_BUILD_TYPE=3) - set(APEX_ERROR_HANDLING TRUE) + #set(APEX_ERROR_HANDLING TRUE) endif() IF("${CMAKE_BUILD_TYPE}" STREQUAL "RelWithDebInfo") @@ -964,6 +964,13 @@ else() "Try manually check out https://github.com/khuck/taskStubs.git to ${PROJECT_SOURCE_DIR}") endif() +if(APEX_BUILD_TESTS) + # Include path needed for example in src/unit_tests/C++ + include_directories(${PROJECT_SOURCE_DIR}/taskStubs/timer_plugin) + # Build the taskstubs library for testing purposes + add_subdirectory (taskStubs) +endif(APEX_BUILD_TESTS) + if(APEX_WITH_PLUGINS) message(INFO " apex will be built with plugin support.") set(LIBS ${LIBS} ${CMAKE_DL_LIBS}) diff --git a/src/apex/address_resolution.cpp b/src/apex/address_resolution.cpp index 87780c0e..86ee3484 100644 --- a/src/apex/address_resolution.cpp +++ b/src/apex/address_resolution.cpp @@ -118,8 +118,18 @@ namespace apex { if (rc == 0) { } else { node->info.probeAddr = ip; - node->info.filename = strdup(info.dli_fname); - node->info.funcname = strdup(info.dli_sname); + if (info.dli_fname == nullptr) { + node->info.filename = strdup("unknown"); + } else { + node->info.filename = strdup(info.dli_fname); + } + if (info.dli_sname == nullptr) { + stringstream ss; + ss << "UNRESOLVED ADDR 0x" << hex << ip; + node->info.funcname = strdup(ss.str().c_str()); + } else { + node->info.funcname = strdup(info.dli_sname); + } } #endif // no APEX_HAVE_BFD #endif // no __APPLE__ diff --git a/src/apex/apex.cpp b/src/apex/apex.cpp index 80135e28..725fcc85 100644 --- a/src/apex/apex.cpp +++ b/src/apex/apex.cpp @@ -47,10 +47,8 @@ #if defined(APEX_WITH_PERFETTO) #include "perfetto_listener.hpp" #endif -#if defined(APEX_DEBUG) || defined(APEX_ERROR_HANDLING) // #define APEX_DEBUG_disabled #include "apex_error_handling.hpp" -#endif #include "address_resolution.hpp" #ifdef APEX_HAVE_OTF2 @@ -93,7 +91,7 @@ DEFINE_DESTRUCTOR(apex_finalize_static_void) #include "banner.hpp" #include "apex_dynamic.hpp" -#if APEX_DEBUG +#ifdef APEX_DEBUG #define FUNCTION_ENTER if (apex_options::use_verbose()) { \ fprintf(stderr, "enter %lu *** %s:%d!\n", \ thread_instance::get_id(), __APEX_FUNCTION__, __LINE__); fflush(stdout); } @@ -114,7 +112,7 @@ using namespace std; namespace apex { -bool& apex::get_program_over() { +bool& get_program_over() { static bool _program_over{false}; return _program_over; } @@ -534,16 +532,8 @@ uint64_t init(const char * thread_name, uint64_t comm_rank, * and if so, don't be suspended long enough to enable the main timer. */ bool suspended = apex_options::suspend(); apex_options::suspend(false); - /* For the main thread, we should always start a top level timer. - * The reason is that if the program calls "exit", our atexit() processing - * will stop this timer, effectively stopping all of its children as well, - * so we will get an accurate measurement for abnormal termination. */ - auto main = task_wrapper::get_apex_main_wrapper(); - // make sure the tracing support puts APEX MAIN on the right thread - // when tracing HPX - the finalization will almost assuredly not - // be stopped on the thread that is calling apex::init. You've been warned. - main->explicit_trace_start = true; - start(main); + /* No need to start a top level timer here, it happens + in the profiler listener. */ if (apex_options::top_level_os_threads()) { // start top-level timer for main thread, it will get automatically // stopped when the main wrapper timer is stopped. @@ -556,8 +546,9 @@ uint64_t init(const char * thread_name, uint64_t comm_rank, task_name = "Main Thread"; } std::shared_ptr twp = - new_task(task_name, UINTMAX_MAX, main); + new_task(task_name, UINTMAX_MAX, task_wrapper::get_apex_main_wrapper()); start(twp); + APEX_ASSERT(twp->state == task_wrapper::RUNNING); thread_instance::set_top_level_timer(twp); } /* restore the suspended bit */ @@ -597,9 +588,9 @@ uint64_t init(const char * thread_name, uint64_t comm_rank, } // It's now safe to initialize CUDA and/or HIP and/or Level0 - dynamic::cuda::init(); - dynamic::roctracer::init(); - dynamic::level0::init(); + if (apex_options::use_cuda()) { dynamic::cuda::init(); } + if (apex_options::use_hip()) { dynamic::roctracer::init(); } + if (apex_options::use_level0()) { dynamic::level0::init(); } // Unset the LD_PRELOAD variable, because Active Harmony is going to // fork/execv a new session-core process, and we don't want APEX in @@ -613,18 +604,11 @@ uint64_t init(const char * thread_name, uint64_t comm_rank, //printf("APEX Version: %s\n", instance->version_string.c_str()); //printf("Executing command line: %s\n", getCommandLine().c_str()); std::stringstream ss; - //ss << apex_banner << "\n"; - ss << " ___ ______ _______ __\n"; - ss << " / _ \\ | ___ \\ ___\\ \\ / /\n"; - ss << "/ /_\\ \\| |_/ / |__ \\ V /\n"; - ss << "| _ || __/| __| / \\\n"; - ss << "| | | || | | |___/ /^\\ \\\n"; - ss << "\\_| |_/\\_| \\____/\\/ \\/\n"; + ss << apex_banner << "\n"; ss << "APEX Version: " << instance->version_string << "\n"; ss << "Executing command line: " << getCommandLine() << "\n" << std::endl; std::string tmp{ss.str()}; fputs(tmp.c_str(), stdout); - } FUNCTION_EXIT return APEX_NOERROR; @@ -639,11 +623,60 @@ string& version() { return instance->version_string; } +class GUIDset : public std::set { +public: + ~GUIDset() { + for (auto& g : *this) { + std::cout << "Orphaned timer: " << std::hex << g << std::endl; + } + } +}; + +void debug_print(const char * event, std::shared_ptr tt_ptr) { + if (get_program_over()) return; + static std::mutex this_mutex; + static GUIDset guids; + std::unique_lock l(this_mutex); + std::stringstream ss; + if (tt_ptr == nullptr) { + ss << thread_instance::get_id() << " " << event << " : (null) : (null)" + << endl; + cout << ss.str(); fflush(stdout); + //APEX_ASSERT(false); + return; + } else { + ss << thread_instance::get_id() << " APEX: " << event << " : " << std::hex << + tt_ptr->guid << " : " << tt_ptr->get_task_id()->get_name() << " - parents: "; + for (auto& p : tt_ptr->parents) { + ss << p->get_task_id()->get_name() << ", "; + } + ss << endl; + cout << ss.str(); fflush(stdout); + } + const std::string _start{"Start"}; + const std::string _yield{"Yield"}; + const std::string _stop{"Stop"}; + const std::string _resume{"Resume"}; + if (_start.compare(event) == 0) { + guids.insert(tt_ptr->guid); + } else { + guids.erase(tt_ptr->guid); + } +} + +#if defined(APEX_DEBUG)//_disabled) +#define LOCAL_DEBUG_PRINT(_event_, _tt_ptr_) if (apex_options::use_verbose()) { debug_print(_event_, _tt_ptr_); } +#else +#define LOCAL_DEBUG_PRINT(_event_, _tt_ptr_) +#endif + /* Populate the new task_wrapper object, and notify listeners. */ inline std::shared_ptr _new_task( task_identifier * id, const uint64_t task_id, - const std::shared_ptr parent_task, apex* instance) { + const std::vector> parent_tasks, + apex* instance) { + //printf("%s Current profiler: %s\n", __func__, thread_instance::instance().get_current_profiler() == nullptr ? "nullptr" : thread_instance::instance().get_current_profiler()->tt_ptr->task_id->get_name().c_str()); in_apex prevent_deadlocks; APEX_UNUSED(instance); std::shared_ptr tt_ptr = make_shared(); @@ -656,17 +689,16 @@ inline std::shared_ptr _new_task( !apex_options::use_otf2()) { tt_ptr->parent = task_wrapper::get_apex_main_wrapper(); // was a parent passed in? - } else */ if (parent_task != nullptr) { - tt_ptr->parent_guid = parent_task->guid; - tt_ptr->parent = parent_task; + } else */ if (parent_tasks.size() > 0) { + tt_ptr->parents = parent_tasks; // if not, is there a current timer? } else { - profiler * p = thread_instance::instance().get_current_profiler(); + auto p = thread_instance::instance().get_current_profiler(); if (p != nullptr) { - tt_ptr->parent_guid = p->guid; - tt_ptr->parent = p->tt_ptr; + //printf("Extracting parent: %s\n", p->tt_ptr->task_id->get_name().c_str()); + tt_ptr->parents.push_back(p->tt_ptr); } else { - tt_ptr->parent = task_wrapper::get_apex_main_wrapper(); + tt_ptr->parents.push_back(task_wrapper::get_apex_main_wrapper()); } } if (apex_options::use_tasktree_output() || apex_options::use_hatchet_output()) { @@ -680,23 +712,19 @@ inline std::shared_ptr _new_task( tt_ptr->guid = task_id; } //instance->active_task_wrappers.insert(tt_ptr); + LOCAL_DEBUG_PRINT("Create", tt_ptr); return tt_ptr; } -void debug_print(const char * event, std::shared_ptr tt_ptr) { - if (apex::get_program_over()) return; - static std::mutex this_mutex; - std::unique_lock l(this_mutex); - std::stringstream ss; - if (tt_ptr == nullptr) { - ss << thread_instance::get_id() << " " << event << " : (null) : (null)" - << endl; - cout << ss.str(); fflush(stdout); - } else { - ss << thread_instance::get_id() << " " << event << " : " << - tt_ptr->guid << " : " << tt_ptr->get_task_id()->get_name() << endl; - cout << ss.str(); fflush(stdout); +/* This function helps because it's possible that the user could call the + * API from a new thread without registering the thread. So, make sure it + * got registered. + */ +bool register_thread_helper(void) { + if (!_registered) { + register_thread("Worker Thread", nullptr); } + return true; } profiler* start(const std::string &timer_name) @@ -730,51 +758,13 @@ profiler* start(const std::string &timer_name) APEX_UTIL_REF_COUNT_SUSPENDED_START return profiler::get_disabled_profiler(); } - std::shared_ptr tt_ptr(nullptr); - profiler * new_profiler = nullptr; - if (_notify_listeners) { - bool success = true; - task_identifier * id = task_identifier::get_task_id(timer_name); - tt_ptr = _new_task(id, UINTMAX_MAX, null_task_wrapper, instance); -#if defined(APEX_DEBUG)//_disabled) - if (apex_options::use_verbose()) { debug_print("Start", tt_ptr); } -#endif - APEX_UTIL_REF_COUNT_TASK_WRAPPER - //read_lock_type l(instance->listener_mutex); - /* - std::stringstream dbg; - dbg << thread_instance::get_id() << " Start : " << id->get_name() << endl; - printf("%s\n",dbg.str().c_str()); - fflush(stdout); - */ - for (unsigned int i = 0 ; i < instance->listeners.size() ; i++) { - success = instance->listeners[i]->on_start(tt_ptr); - tt_ptr->prof = thread_instance::instance().get_current_profiler(); - if (!success && i == 0) { - //cout << thread_instance::get_id() << " *** Not success! " << - //id->get_name() << endl; fflush(stdout); - APEX_UTIL_REF_COUNT_FAILED_START - return profiler::get_disabled_profiler(); - } - } - // If we are allowing untied timers, clear the timer stack on this thread - if (apex_options::untied_timers() == true) { - new_profiler = thread_instance::instance().get_current_profiler(); - thread_instance::instance().clear_current_profiler(); - } - } -#if defined(APEX_DEBUG) - const std::string apex_process_profile_str("apex::process_profiles"); - if (timer_name.compare(apex_process_profile_str) == 0) { - APEX_UTIL_REF_COUNT_APEX_INTERNAL_START - } else { - APEX_UTIL_REF_COUNT_START - } -#endif - if (apex_options::untied_timers() == true) { - return new_profiler; - } - return thread_instance::instance().restore_children_profilers(tt_ptr); + // make sure APEX knows about this worker thread! + [[maybe_unused]] thread_local static bool _helper = register_thread_helper(); + task_identifier * id = task_identifier::get_task_id(timer_name); + auto tt_ptr = _new_task(id, UINTMAX_MAX, {}, instance); + APEX_ASSERT(tt_ptr->state == task_wrapper::CREATED); + start(tt_ptr); + return thread_instance::instance().get_current_profiler(); } profiler* start(const apex_function_address function_address) { @@ -795,51 +785,17 @@ profiler* start(const apex_function_address function_address) { APEX_UTIL_REF_COUNT_SUSPENDED_START return profiler::get_disabled_profiler(); } - std::shared_ptr tt_ptr(nullptr); - profiler * new_profiler = nullptr; - if (_notify_listeners) { - bool success = true; - task_identifier * id = task_identifier::get_task_id(function_address); - tt_ptr = _new_task(id, UINTMAX_MAX, null_task_wrapper, instance); -#if defined(APEX_DEBUG)//_disabled) - if (apex_options::use_verbose()) { debug_print("Start", tt_ptr); } -#endif - APEX_UTIL_REF_COUNT_TASK_WRAPPER - /* - std::stringstream dbg; - dbg << thread_instance::get_id() << " Start : " << id->get_name() << endl; - printf("%s\n",dbg.str().c_str()); - fflush(stdout); - */ - //read_lock_type l(instance->listener_mutex); - for (unsigned int i = 0 ; i < instance->listeners.size() ; i++) { - success = instance->listeners[i]->on_start(tt_ptr); - tt_ptr->prof = thread_instance::instance().get_current_profiler(); - if (!success && i == 0) { - //cout << thread_instance::get_id() << " *** Not success! " << - //id->get_name() << endl; fflush(stdout); - APEX_UTIL_REF_COUNT_FAILED_START - return profiler::get_disabled_profiler(); - } - } - // If we are allowing untied timers, clear the timer stack on this thread - if (apex_options::untied_timers() == true) { - new_profiler = thread_instance::instance().get_current_profiler(); - thread_instance::instance().clear_current_profiler(); - } - } - APEX_UTIL_REF_COUNT_START - if (apex_options::untied_timers() == true) { - return new_profiler; - } - return thread_instance::instance().restore_children_profilers(tt_ptr); + // make sure APEX knows about this worker thread! + [[maybe_unused]] thread_local static bool _helper = register_thread_helper(); + task_identifier * id = task_identifier::get_task_id(function_address); + auto tt_ptr = _new_task(id, UINTMAX_MAX, {}, instance); + start(tt_ptr); + return thread_instance::instance().get_current_profiler(); } void start(std::shared_ptr tt_ptr) { in_apex prevent_deadlocks; -#if defined(APEX_DEBUG)//_disabled) - if (apex_options::use_verbose()) { debug_print("Start", tt_ptr); } -#endif + LOCAL_DEBUG_PRINT("Start", tt_ptr); if (tt_ptr == nullptr) { APEX_UTIL_REF_COUNT_APEX_INTERNAL_START return; @@ -868,38 +824,78 @@ void start(std::shared_ptr tt_ptr) { tt_ptr->prof = profiler::get_disabled_profiler(); return; } + // make sure APEX knows about this worker thread! + [[maybe_unused]] thread_local static bool _helper = register_thread_helper(); // get the thread id that is running this task - tt_ptr->thread_id = thread_instance::instance().get_id(); + if (tt_ptr->thread_id != thread_instance::instance().get_id()) { + printf("Task %s created by %lu started by %lu\n", tt_ptr->task_id->get_name().c_str(), + tt_ptr->thread_id, thread_instance::instance().get_id()); + } + APEX_ASSERT(tt_ptr->state == task_wrapper::CREATED || tt_ptr->state == task_wrapper::YIELDED); + tt_ptr->state = task_wrapper::RUNNING; if (_notify_listeners) { bool success = true; - /* - std::stringstream dbg; - dbg << thread_instance::get_id() << " Start : " << tt_ptr->task_id->get_name() << endl; - printf("%s\n",dbg.str().c_str()); - fflush(stdout); - */ - //read_lock_type l(instance->listener_mutex); for (unsigned int i = 0 ; i < instance->listeners.size() ; i++) { success = instance->listeners[i]->on_start(tt_ptr); - tt_ptr->prof = thread_instance::instance().get_current_profiler(); if (!success && i == 0) { - //cout << thread_instance::get_id() << " *** Not success! " << - //id->get_name() << endl; fflush(stdout); APEX_UTIL_REF_COUNT_FAILED_START tt_ptr->prof = profiler::get_disabled_profiler(); return; } } - // If we are allowing untied timers, clear the timer stack on this thread - if (apex_options::untied_timers() == true) { - thread_instance::instance().clear_current_profiler(); - } } APEX_UTIL_REF_COUNT_START thread_instance::instance().restore_children_profilers(tt_ptr); return; } +void resume(std::shared_ptr tt_ptr) { + in_apex prevent_deadlocks; + LOCAL_DEBUG_PRINT("Resume", tt_ptr); + // if APEX is disabled, do nothing. + if (apex_options::disable() == true) { + APEX_UTIL_REF_COUNT_DISABLED_RESUME + return; + } + // if APEX is suspended, do nothing. + if (apex_options::suspend() == true) { + APEX_UTIL_REF_COUNT_SUSPENDED_RESUME + return; + } + apex* instance = apex::instance(); // get the Apex static instance + // protect against calls after finalization + if (!instance || _exited) { + APEX_UTIL_REF_COUNT_RESUME_AFTER_FINALIZE + return; + } + // make sure APEX knows about this worker thread! + [[maybe_unused]] thread_local static bool _helper = register_thread_helper(); + APEX_ASSERT(tt_ptr->state == task_wrapper::YIELDED || + tt_ptr->state == task_wrapper::CREATED); + tt_ptr->state = task_wrapper::RUNNING; + if (_notify_listeners) { + APEX_UTIL_REF_COUNT_TASK_WRAPPER + bool success = true; + try { + for (unsigned int i = 0 ; i < instance->listeners.size() ; i++) { + success = instance->listeners[i]->on_resume(tt_ptr); + if (!success && i == 0) { + APEX_UTIL_REF_COUNT_FAILED_RESUME + tt_ptr->prof = profiler::get_disabled_profiler(); + return; + } + } + } catch (disabled_profiler_exception &e) { + APEX_UTIL_REF_COUNT_FAILED_RESUME + tt_ptr->prof = profiler::get_disabled_profiler(); + return; + } + } + APEX_UTIL_REF_COUNT_RESUME + thread_instance::instance().restore_children_profilers(tt_ptr); + return; +} + profiler* resume(const std::string &timer_name) { in_apex prevent_deadlocks; // if APEX is disabled, do nothing. @@ -927,30 +923,10 @@ profiler* resume(const std::string &timer_name) { APEX_UTIL_REF_COUNT_RESUME_AFTER_FINALIZE return nullptr; } - std::shared_ptr tt_ptr(nullptr); - if (_notify_listeners) { - task_identifier * id = task_identifier::get_task_id(timer_name); - tt_ptr = _new_task(id, UINTMAX_MAX, null_task_wrapper, instance); - APEX_UTIL_REF_COUNT_TASK_WRAPPER - try { - //read_lock_type l(instance->listener_mutex); - for (unsigned int i = 0 ; i < instance->listeners.size() ; i++) { - instance->listeners[i]->on_resume(tt_ptr); - } - } catch (disabled_profiler_exception &e) { - APEX_UTIL_REF_COUNT_FAILED_RESUME - return profiler::get_disabled_profiler(); - } - } -#if defined(APEX_DEBUG) - const std::string apex_process_profile_str("apex::process_profiles"); - if (timer_name.compare(apex_process_profile_str) == 0) { - APEX_UTIL_REF_COUNT_APEX_INTERNAL_RESUME - } else { - APEX_UTIL_REF_COUNT_RESUME - } -#endif - return thread_instance::instance().restore_children_profilers(tt_ptr); + task_identifier * id = task_identifier::get_task_id(timer_name); + std::shared_ptr tt_ptr = _new_task(id, UINTMAX_MAX, {}, instance); + resume(tt_ptr); + return thread_instance::instance().get_current_profiler(); } profiler* resume(const apex_function_address function_address) { @@ -971,27 +947,15 @@ profiler* resume(const apex_function_address function_address) { APEX_UTIL_REF_COUNT_RESUME_AFTER_FINALIZE return nullptr; } - std::shared_ptr tt_ptr(nullptr); - if (_notify_listeners) { - task_identifier * id = task_identifier::get_task_id(function_address); - tt_ptr = _new_task(id, UINTMAX_MAX, null_task_wrapper, instance); - APEX_UTIL_REF_COUNT_TASK_WRAPPER - try { - //read_lock_type l(instance->listener_mutex); - for (unsigned int i = 0 ; i < instance->listeners.size() ; i++) { - instance->listeners[i]->on_resume(tt_ptr); - } - } catch (disabled_profiler_exception &e) { - APEX_UTIL_REF_COUNT_FAILED_RESUME - return profiler::get_disabled_profiler(); - } - } - APEX_UTIL_REF_COUNT_RESUME - return thread_instance::instance().restore_children_profilers(tt_ptr); + task_identifier * id = task_identifier::get_task_id(function_address); + std::shared_ptr tt_ptr = _new_task(id, UINTMAX_MAX, {}, instance); + resume(tt_ptr); + return thread_instance::instance().get_current_profiler(); } profiler* resume(profiler * p) { in_apex prevent_deadlocks; + LOCAL_DEBUG_PRINT("Resume", p->tt_ptr); // if APEX is disabled, do nothing. if (apex_options::disable() == true) { APEX_UTIL_REF_COUNT_DISABLED_RESUME @@ -1004,6 +968,7 @@ profiler* resume(profiler * p) { } if (p->stopped) { APEX_UTIL_REF_COUNT_DOUBLE_STOP + LOCAL_DEBUG_PRINT("Double Stop resume?", p->tt_ptr); return profiler::get_disabled_profiler(); } apex* instance = apex::instance(); // get the Apex static instance @@ -1012,29 +977,10 @@ profiler* resume(profiler * p) { APEX_UTIL_REF_COUNT_RESUME_AFTER_FINALIZE return nullptr; } - p->restart(); - if (_notify_listeners) { - try { - // skip the profiler_listener - we are restoring a child timer - // for a parent that was yielded. - for (unsigned int i = 1 ; i < instance->listeners.size() ; i++) { - instance->listeners[i]->on_resume(p->tt_ptr); - } - } catch (disabled_profiler_exception &e) { - APEX_UTIL_REF_COUNT_FAILED_RESUME - return profiler::get_disabled_profiler(); - } - } -#if defined(APEX_DEBUG) - const std::string apex_process_profile_str("apex::process_profiles"); - if (p->tt_ptr->get_task_id()->get_name(false).compare(apex_process_profile_str) - == 0) { - APEX_UTIL_REF_COUNT_APEX_INTERNAL_RESUME - } else { - APEX_UTIL_REF_COUNT_RESUME - } -#endif - return p; + // make sure APEX knows about this worker thread! + [[maybe_unused]] thread_local static bool _helper = register_thread_helper(); + resume(p->tt_ptr); + return thread_instance::instance().get_current_profiler(); } void reset(const std::string &timer_name) { @@ -1087,54 +1033,6 @@ void apex::complete_task(std::shared_ptr task_wrapper_ptr) { } } -void apex::stop_internal(profiler* the_profiler) { - in_apex prevent_deadlocks; - // if APEX is disabled, do nothing. - if (apex_options::disable() == true) { - APEX_UTIL_REF_COUNT_DISABLED_STOP - return; - } - if (the_profiler == profiler::get_disabled_profiler()) { - APEX_UTIL_REF_COUNT_DISABLED_STOP - return; // profiler was throttled. - } - if (the_profiler == nullptr) { - APEX_UTIL_REF_COUNT_NULL_STOP - return; - } - if (the_profiler->stopped) { - APEX_UTIL_REF_COUNT_DOUBLE_STOP - return; - } -#if defined(APEX_DEBUG)//_disabled) - if (apex_options::use_verbose()) { debug_print("Stop", the_profiler->tt_ptr); } -#endif - apex* instance = apex::instance(); // get the Apex static instance - // protect against calls after finalization - if (!instance || _exited || _measurement_stopped) { - APEX_UTIL_REF_COUNT_STOP_AFTER_FINALIZE - return; - } - std::shared_ptr p{the_profiler}; - if (_notify_listeners) { - //read_lock_type l(instance->listener_mutex); - for (unsigned int i = 0 ; i < instance->listeners.size() ; i++) { - instance->listeners[i]->on_stop(p); - } - } -#if defined(APEX_DEBUG) - const std::string apex_process_profile_str("apex::process_profiles"); - if (p->tt_ptr->get_task_id()->get_name(false).compare(apex_process_profile_str) - == 0) { - APEX_UTIL_REF_COUNT_APEX_INTERNAL_STOP - } else { - APEX_UTIL_REF_COUNT_STOP - } -#endif - instance->complete_task(p->tt_ptr); - p->tt_ptr = nullptr; -} - void stop(profiler* the_profiler, bool cleanup) { in_apex prevent_deadlocks; // protect against calls after finalization @@ -1153,24 +1051,29 @@ void stop(profiler* the_profiler, bool cleanup) { } if (the_profiler == nullptr) { APEX_UTIL_REF_COUNT_NULL_STOP + LOCAL_DEBUG_PRINT("Stop profiler cleanup", nullptr); return; } if (the_profiler->stopped) { APEX_UTIL_REF_COUNT_DOUBLE_STOP + LOCAL_DEBUG_PRINT("Double Stop cleanup", nullptr); return; } -#if defined(APEX_DEBUG)//_disabled) - if (apex_options::use_verbose()) { debug_print("Stop", the_profiler->tt_ptr); } -#endif - thread_instance::instance().clear_current_profiler(the_profiler, false, - null_task_wrapper); + LOCAL_DEBUG_PRINT("Stop", the_profiler->tt_ptr); + thread_instance::instance().clear_current_profiler(false, + the_profiler->tt_ptr); apex* instance = apex::instance(); // get the Apex static instance // protect against calls after finalization if (!instance || _exited || _measurement_stopped) { APEX_UTIL_REF_COUNT_STOP_AFTER_FINALIZE return; } - std::shared_ptr p{the_profiler}; + // make sure APEX knows about this worker thread! + [[maybe_unused]] thread_local static bool _helper = register_thread_helper(); + std::shared_ptr p = std::make_shared(*the_profiler); + //std::shared_ptr p = the_profiler->Get(); + APEX_ASSERT(p->tt_ptr->state == task_wrapper::RUNNING); + p->tt_ptr->state = task_wrapper::STOPPED; if (_notify_listeners) { //read_lock_type l(instance->listener_mutex); for (unsigned int i = 0 ; i < instance->listeners.size() ; i++) { @@ -1198,6 +1101,7 @@ void stop(profiler* the_profiler, bool cleanup) { //instance->active_task_wrappers.erase(p->tt_ptr); p->tt_ptr = nullptr; } + delete (the_profiler); } void stop(std::shared_ptr tt_ptr) { @@ -1207,9 +1111,7 @@ void stop(std::shared_ptr tt_ptr) { APEX_UTIL_REF_COUNT_STOP_AFTER_FINALIZE return; } -#if defined(APEX_DEBUG)//_disabled) - if (apex_options::use_verbose()) { debug_print("Stop", tt_ptr); } -#endif + LOCAL_DEBUG_PRINT("Stop", tt_ptr); // if APEX is disabled, do nothing. if (apex_options::disable() == true) { APEX_UTIL_REF_COUNT_DISABLED_STOP @@ -1218,6 +1120,7 @@ void stop(std::shared_ptr tt_ptr) { } if (tt_ptr == nullptr || tt_ptr->prof == nullptr) { APEX_UTIL_REF_COUNT_NULL_STOP + LOCAL_DEBUG_PRINT("Stop tt_ptr", nullptr); return; } apex* instance = apex::instance(); // get the Apex static instance @@ -1227,22 +1130,38 @@ void stop(std::shared_ptr tt_ptr) { } if (tt_ptr->prof->stopped) { APEX_UTIL_REF_COUNT_DOUBLE_STOP + LOCAL_DEBUG_PRINT("Double Stop tt_ptr", tt_ptr); return; } - thread_instance::instance().clear_current_profiler(tt_ptr->prof, false, - null_task_wrapper); + // get the thread id that is running this task + if (tt_ptr->prof->thread_id != thread_instance::instance().get_id() && + apex_options::use_verbose()) { + printf("Task %s started by %lu stopped by %lu\n", tt_ptr->task_id->get_name().c_str(), + tt_ptr->prof->thread_id, thread_instance::instance().get_id()); + //APEX_ASSERT(tt_ptr->prof->thread_id == thread_instance::instance().get_id()); + } + thread_instance::instance().clear_current_profiler(false, tt_ptr); // protect against calls after finalization if (!instance || _exited || _measurement_stopped) { APEX_UTIL_REF_COUNT_STOP_AFTER_FINALIZE return; } - std::shared_ptr p{tt_ptr->prof}; - if (_notify_listeners) { - //read_lock_type l(instance->listener_mutex); - for (unsigned int i = 0 ; i < instance->listeners.size() ; i++) { - instance->listeners[i]->on_stop(p); - } + // how did this happen? + if (tt_ptr->state == task_wrapper::YIELDED) { + resume(tt_ptr); } + // make sure APEX knows about this worker thread! + [[maybe_unused]] thread_local static bool _helper = register_thread_helper(); + APEX_ASSERT(tt_ptr->state == task_wrapper::RUNNING); + tt_ptr->state = task_wrapper::STOPPED; + if (tt_ptr->prof != nullptr) { + std::shared_ptr p{tt_ptr->prof}; + if (_notify_listeners) { + //read_lock_type l(instance->listener_mutex); + for (unsigned int i = 0 ; i < instance->listeners.size() ; i++) { + instance->listeners[i]->on_stop(p); + } + } /* std::stringstream dbg; dbg << thread_instance::get_id() << "->" << tt_ptr->thread_id << " Stop : " << @@ -1251,19 +1170,19 @@ void stop(std::shared_ptr tt_ptr) { fflush(stdout); */ #if defined(APEX_DEBUG) - const std::string apex_process_profile_str("apex::process_profiles"); - if (p->tt_ptr->get_task_id()->get_name(false).compare(apex_process_profile_str) - == 0) { - APEX_UTIL_REF_COUNT_APEX_INTERNAL_STOP - } else { - APEX_UTIL_REF_COUNT_STOP - } + const std::string apex_process_profile_str("apex::process_profiles"); + if (p->tt_ptr->get_task_id()->get_name(false).compare(apex_process_profile_str) + == 0) { + APEX_UTIL_REF_COUNT_APEX_INTERNAL_STOP + } else { + APEX_UTIL_REF_COUNT_STOP + } #endif + } instance->complete_task(tt_ptr); } -void yield(profiler* the_profiler) -{ +void yield(profiler* the_profiler) { in_apex prevent_deadlocks; // if APEX is disabled, do nothing. if (apex_options::disable() == true) { @@ -1288,9 +1207,15 @@ void yield(profiler* the_profiler) APEX_UTIL_REF_COUNT_DOUBLE_YIELD return; } - thread_instance::instance().clear_current_profiler(the_profiler, false, - null_task_wrapper); - std::shared_ptr p{the_profiler}; + LOCAL_DEBUG_PRINT("Yield", the_profiler->tt_ptr); + thread_instance::instance().clear_current_profiler(false, + the_profiler->tt_ptr); + // make sure APEX knows about this worker thread! + [[maybe_unused]] thread_local static bool _helper = register_thread_helper(); + std::shared_ptr p = std::make_shared(*the_profiler); + //std::shared_ptr p = the_profiler->Get(); + APEX_ASSERT(p->tt_ptr->state == task_wrapper::RUNNING); + p->tt_ptr->state = task_wrapper::YIELDED; if (_notify_listeners) { //read_lock_type l(instance->listener_mutex); for (unsigned int i = 0 ; i < instance->listeners.size() ; i++) { @@ -1308,14 +1233,12 @@ void yield(profiler* the_profiler) APEX_UTIL_REF_COUNT_YIELD } #endif + delete (the_profiler); } -void yield(std::shared_ptr tt_ptr) -{ +void yield(std::shared_ptr tt_ptr) { in_apex prevent_deadlocks; -#if defined(APEX_DEBUG)//_disabled) - if (apex_options::use_verbose()) { debug_print("Yield", tt_ptr); } -#endif + LOCAL_DEBUG_PRINT("Yield", tt_ptr); // if APEX is disabled, do nothing. if (apex_options::disable() == true) { APEX_UTIL_REF_COUNT_DISABLED_YIELD @@ -1339,9 +1262,12 @@ void yield(std::shared_ptr tt_ptr) APEX_UTIL_REF_COUNT_DOUBLE_YIELD return; } - thread_instance::instance().clear_current_profiler(tt_ptr->prof, - true, tt_ptr); + thread_instance::instance().clear_current_profiler(true, tt_ptr); + // make sure APEX knows about this worker thread! + [[maybe_unused]] thread_local static bool _helper = register_thread_helper(); std::shared_ptr p{tt_ptr->prof}; + APEX_ASSERT(tt_ptr->state == task_wrapper::RUNNING); + tt_ptr->state = task_wrapper::YIELDED; if (_notify_listeners) { //read_lock_type l(instance->listener_mutex); for (unsigned int i = 0 ; i < instance->listeners.size() ; i++) { @@ -1375,6 +1301,8 @@ void sample_value(const std::string &name, double value, bool threaded) if (apex_options::suspend() == true) { return; } apex* instance = apex::instance(); // get the Apex static instance if (!instance) return; // protect against calls after finalization + // make sure APEX knows about this worker thread! + [[maybe_unused]] thread_local static bool _helper = register_thread_helper(); // parse the counter name // either /threadqueue{locality#0/total}/length // or /threadqueue{locality#0/worker-thread#0}/length @@ -1409,36 +1337,68 @@ void sample_value(const std::string &name, double value, bool threaded) } } +#define APEX_CHECK_DISABLE \ + /* if APEX is disabled, do nothing. */ \ + if (apex_options::disable() == true) { \ + APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER \ + return nullptr; \ + } +#define APEX_CHECK_SUSPEND \ + /* if APEX is suspended, do nothing. */ \ + if (apex_options::suspend() == true) { \ + APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER \ + return nullptr; \ + } +#define APEX_CHECK_INTERNAL \ + const std::string apex_internal("apex_internal"); \ + if (starts_with(name, apex_internal)) { \ + APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER \ + /* don't process our own events - queue scrubbing tasks. */ \ + return nullptr; \ + } +#define APEX_CHECK_INSTANCE \ + /* get the Apex static instance */ \ + apex* instance = apex::instance(); \ + /* protect against calls after finalization */ \ + if (!instance || _exited) { \ + APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER \ + return nullptr; \ + } + std::shared_ptr new_task( const std::string &name, const uint64_t task_id, const std::shared_ptr parent_task) { in_apex prevent_deadlocks; - // if APEX is disabled, do nothing. - if (apex_options::disable() == true) { - APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER - return nullptr; - } - // if APEX is suspended, do nothing. - if (apex_options::suspend() == true) { - APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER - return nullptr; - } - const std::string apex_internal("apex_internal"); - if (starts_with(name, apex_internal)) { - APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER - // don't process our own events - queue scrubbing tasks. - return nullptr; + APEX_CHECK_DISABLE + APEX_CHECK_SUSPEND + APEX_CHECK_INTERNAL + APEX_CHECK_INSTANCE + task_identifier * id = task_identifier::get_task_id(name); + std::vector> parents{}; + if (parent_task != null_task_wrapper) { + parents.push_back(parent_task); } - apex* instance = apex::instance(); // get the Apex static instance - if (!instance || _exited) { - APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER - return nullptr; - } // protect against calls after finalization + std::shared_ptr + tt_ptr(_new_task(id, task_id, parents, instance)); + APEX_UTIL_REF_COUNT_TASK_WRAPPER + return tt_ptr; +} + +std::shared_ptr new_task( + const std::string &name, + const uint64_t task_id, + const std::vector> parent_tasks) +{ + in_apex prevent_deadlocks; + APEX_CHECK_DISABLE + APEX_CHECK_SUSPEND + APEX_CHECK_INTERNAL + APEX_CHECK_INSTANCE task_identifier * id = task_identifier::get_task_id(name); std::shared_ptr - tt_ptr(_new_task(id, task_id, parent_task, instance)); + tt_ptr(_new_task(id, task_id, parent_tasks, instance)); APEX_UTIL_REF_COUNT_TASK_WRAPPER return tt_ptr; } @@ -1448,23 +1408,32 @@ std::shared_ptr new_task( const uint64_t task_id, const std::shared_ptr parent_task) { in_apex prevent_deadlocks; - // if APEX is disabled, do nothing. - if (apex_options::disable() == true) { - APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER - return nullptr; } - // if APEX is suspended, do nothing. - if (apex_options::suspend() == true) { - APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER - return nullptr; } - // get the Apex static instance - apex* instance = apex::instance(); - // protect against calls after finalization - if (!instance || _exited) { - APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER - return nullptr; } + APEX_CHECK_DISABLE + APEX_CHECK_SUSPEND + APEX_CHECK_INSTANCE task_identifier * id = task_identifier::get_task_id(function_address); + std::vector> parents{}; + if (parent_task != null_task_wrapper) { + parents.push_back(parent_task); + } std::shared_ptr - tt_ptr(_new_task(id, task_id, parent_task, instance)); + tt_ptr(_new_task(id, task_id, parents, instance)); + APEX_UTIL_REF_COUNT_TASK_WRAPPER + return tt_ptr; +} + +std::shared_ptr new_task( + const apex_function_address function_address, + const uint64_t task_id, + const std::vector> parent_tasks) { + in_apex prevent_deadlocks; + APEX_CHECK_DISABLE + APEX_CHECK_SUSPEND + APEX_CHECK_INSTANCE + task_identifier * id = task_identifier::get_task_id(function_address); + std::shared_ptr + tt_ptr(_new_task(id, task_id, parent_tasks, instance)); + APEX_UTIL_REF_COUNT_TASK_WRAPPER return tt_ptr; } @@ -1486,7 +1455,7 @@ std::shared_ptr update_task( // protect against calls after finalization if (!instance || _exited) { return nullptr; } task_identifier * id = task_identifier::get_task_id(timer_name); - wrapper = _new_task(id, UINTMAX_MAX, null_task_wrapper, instance); + wrapper = _new_task(id, UINTMAX_MAX, {}, instance); } else { task_identifier * id = task_identifier::get_task_id(timer_name); // only have to do something if the ID has changed @@ -1530,7 +1499,7 @@ std::shared_ptr update_task( // protect against calls after finalization if (!instance || _exited) { return nullptr; } task_identifier * id = task_identifier::get_task_id(function_address); - wrapper = _new_task(id, UINTMAX_MAX, null_task_wrapper, instance); + wrapper = _new_task(id, UINTMAX_MAX, {}, instance); } else { task_identifier * id = task_identifier::get_task_id(function_address); // only have to do something if the ID has changed @@ -1704,9 +1673,9 @@ std::string dump(bool reset, bool finalizing) { apex* instance = apex::instance(); // protect against calls after finalization if (!instance) { FUNCTION_EXIT return(std::string("")); } - dynamic::cuda::flush(); - dynamic::roctracer::flush(); - dynamic::level0::flush(); + if (apex_options::use_cuda()) { dynamic::cuda::flush(); } + if (apex_options::use_hip()) { dynamic::roctracer::flush(); } + if (apex_options::use_level0()) { dynamic::level0::flush(); } /* only track after N calls to apex::dump() */ index = index + 1; if (apex_options::delay_memory_tracking() && @@ -1736,6 +1705,7 @@ std::string dump(bool reset, bool finalizing) { void finalize(void) { + FUNCTION_ENTER in_apex prevent_deadlocks; if (!_initialized) { return; } // protect against finalization without initialization // prevent re-entry, be extra strict about race conditions - it is @@ -1756,7 +1726,6 @@ void finalize(void) } // if APEX is disabled, do nothing. if (apex_options::disable() == true) { return; } - FUNCTION_ENTER instance->finalizing = true; // don't measure any new pthreads from pthread_create! // FIRST FIRST, check if we have orphaned threads... // See apex::register_thread and apex::exit_thread for more info. @@ -1771,22 +1740,28 @@ void finalize(void) // make sure it hasn't been erased! if (instance->erased_threads.find(t) == instance->erased_threads.end()) { - t->clear_all_profilers(); + //t->clear_all_profilers(); + auto top_profiler = t->get_current_profiler(); + while (top_profiler != nullptr) { + stop(top_profiler); + if (top_profiler->untied_parent == nullptr) { break; } + top_profiler = t->get_current_profiler(); + } } } } } #endif - // FIRST, stop the top level timer, while the infrastructure is still - // functioning. - auto tmp = thread_instance::get_top_level_timer(); - if (tmp != nullptr) { - stop(tmp); - thread_instance::clear_top_level_timer(); - } - // Second, stop the main timer, while the infrastructure is still - // functioning. - instance->the_profiler_listener->stop_main_timer(); + auto top_profiler = thread_instance::instance().get_current_profiler(); + while (top_profiler != nullptr) { + stop(top_profiler); + if (top_profiler->untied_parent == nullptr) { break; } + top_profiler = thread_instance::instance().get_current_profiler(); + } + /* Signal the other threads that have open profiles to exit */ + if (apex_options::top_level_os_threads()) { + //apex_signal_all_threads(); + } // if not done already... shutdown_throttling(); // stop thread scheduler policies /* Do this before OTF2 grabs a final timestamp - we might have @@ -1799,14 +1774,20 @@ void finalize(void) stop_all_async_threads(); // stop OS/HW monitoring, including PAPI /* This could take a while */ - dynamic::cuda::flush(); - dynamic::roctracer::flush(); - dynamic::level0::flush(); + if (apex_options::use_cuda()) { + dynamic::cuda::flush(); + dynamic::cuda::stop(); + } + if (apex_options::use_hip()) { + dynamic::roctracer::flush(); + dynamic::roctracer::stop(); + } + if (apex_options::use_level0()) { + dynamic::level0::flush(); + dynamic::level0::stop(); + } // stop processing new timers/counters/messages/tasks/etc. - dynamic::cuda::stop(); - dynamic::roctracer::stop(); - dynamic::level0::stop(); apex_options::suspend(true); // now, process all output @@ -1846,7 +1827,7 @@ void finalize(void) void cleanup(void) { in_apex prevent_deadlocks; FUNCTION_ENTER - apex::get_program_over() = true; + get_program_over() = true; #ifdef APEX_HAVE_HPX // prevent crash at shutdown. return; @@ -1955,6 +1936,7 @@ void register_thread(const std::string &name, std::unique_lock l{instance->thread_instance_mutex}; instance->known_threads.insert(&ti); } + apex_register_thread_cleanup(); } void exit_thread(void) @@ -1979,13 +1961,12 @@ void exit_thread(void) instance->known_threads.erase(&ti); } } - auto tmp = thread_instance::get_top_level_timer(); + auto top_profiler = thread_instance::instance().get_current_profiler(); // tell the timer cleanup that we are exiting thread_instance::exiting(); - //printf("Old thread: %p\n", &(*tmp)); - if (tmp != nullptr) { - stop(tmp); - thread_instance::clear_top_level_timer(); + while (top_profiler != nullptr) { + stop(top_profiler); + top_profiler = thread_instance::instance().get_current_profiler(); } // ok to set this now - we need everything still running _exited = true; @@ -2306,6 +2287,8 @@ void send (uint64_t tag, uint64_t size, uint64_t target) { apex* instance = apex::instance(); // protect against calls after finalization if (!instance || _exited) { return ; } + // make sure APEX knows about this worker thread! + [[maybe_unused]] thread_local static bool _helper = register_thread_helper(); if (_notify_listeners) { // eventually, we want to use the thread id, but for now, just use 0. @@ -2335,6 +2318,8 @@ void recv (uint64_t tag, uint64_t size, uint64_t source_rank, uint64_t apex* instance = apex::instance(); // protect against calls after finalization if (!instance || _exited) { return ; } + // make sure APEX knows about this worker thread! + [[maybe_unused]] thread_local static bool _helper = register_thread_helper(); if (_notify_listeners) { // eventually, we want to use the thread id, but for now, just use 0. diff --git a/src/apex/apex.hpp b/src/apex/apex.hpp index a93cddda..afd0e6a5 100644 --- a/src/apex/apex.hpp +++ b/src/apex/apex.hpp @@ -174,7 +174,6 @@ class apex void stop_all_policy_handles(void); bool policy_handle_exists(apex_policy_handle* handle); void complete_task(std::shared_ptr task_wrapper_ptr); - static void stop_internal(profiler* p); ~apex(); std::atomic finalizing; }; diff --git a/src/apex/apex_api.hpp b/src/apex/apex_api.hpp index 992f5a6e..063aadeb 100644 --- a/src/apex/apex_api.hpp +++ b/src/apex/apex_api.hpp @@ -357,6 +357,11 @@ APEX_EXPORT std::shared_ptr new_task( const uint64_t task_id = UINTMAX_MAX, const std::shared_ptr parent_task = null_task_wrapper); +APEX_EXPORT std::shared_ptr new_task( + const std::string &name, + const uint64_t task_id, + const std::vector> parent_tasks); + /** \brief Create a new task (dependency). @@ -375,6 +380,11 @@ APEX_EXPORT std::shared_ptr new_task( const uint64_t task_id = UINTMAX_MAX, const std::shared_ptr parent_task = null_task_wrapper); +APEX_EXPORT std::shared_ptr new_task( + const apex_function_address function_address, + const uint64_t task_id, + const std::vector> parent_tasks); + /** \brief Update a task (dependency). @@ -856,6 +866,8 @@ APEX_EXPORT void send (uint64_t tag, uint64_t size, uint64_t target); APEX_EXPORT void recv (uint64_t tag, uint64_t size, uint64_t source_rank, uint64_t source_thread); +APEX_EXPORT bool& get_program_over(void); + /** \brief A convenience class for using APEX in C++ applications. diff --git a/src/apex/apex_error_handling.cpp b/src/apex/apex_error_handling.cpp index 19d076af..be18f794 100644 --- a/src/apex/apex_error_handling.cpp +++ b/src/apex/apex_error_handling.cpp @@ -159,3 +159,74 @@ void apex_test_signal_handler() { apex_custom_signal_handler(1); } +std::vector& profilers_to_exit(void) { + static std::vector _thevector; + return _thevector; +} + +std::atomic threads_to_exit_count{0}; + +//static void apex_custom_signal_handler_thread_exit([[maybe_unused]] int sig) { +static void apex_custom_signal_handler_thread_exit( + [[maybe_unused]] int sig, + [[maybe_unused]] siginfo_t * info, + [[maybe_unused]] void * context) { + APEX_ASSERT(sig == SIGUSR2); + auto p = apex::thread_instance::instance().get_current_profiler(); + apex::profiler* parent = nullptr; + while(p != nullptr) { + if (p->untied_parent == nullptr || p->untied_parent->tt_ptr->state != apex::task_wrapper::RUNNING) { + parent = nullptr; + } else { + parent = p->untied_parent; + } + // only push profilers that were started on THIS thread... + if (p != nullptr && p->thread_id == apex::thread_instance::instance().get_id()) { + profilers_to_exit().push_back(p); + } + p = parent; + } + threads_to_exit_count--; + return; +} + +int apex_register_thread_cleanup(void) { + static bool doOnce{false}; + if (doOnce) return 0; + struct sigaction act; + struct sigaction old; + memset(&act, 0, sizeof(act)); + memset(&old, 0, sizeof(old)); + sigemptyset(&act.sa_mask); + std::array mysignals = { SIGUSR2 }; + act.sa_flags = SA_RESTART | SA_SIGINFO; + act.sa_sigaction = apex_custom_signal_handler_thread_exit; + for (auto s : mysignals) { + sigaction(s, &act, &old); +#ifdef APEX_BE_GOOD_CITIZEN + other_handlers[s] = old; +#endif + } + doOnce = true; + return 0; +} + +void apex_signal_all_threads(void) { + auto tids = apex::thread_instance::gettids(); + pthread_t me = pthread_self(); + // generous...but not a hard limit. Trying not to allocate memory during the signal handler, above. + profilers_to_exit().reserve(tids.size() * 10); + // don't include myself + threads_to_exit_count = tids.size() - 1; + for (auto t : tids) { + if (t != me) { + pthread_kill(t,SIGUSR2); + } + } + while(threads_to_exit_count > 0) { //wait! + } + for (auto p : profilers_to_exit()) { + apex::stop(p); + } + profilers_to_exit().clear(); +} diff --git a/src/apex/apex_error_handling.hpp b/src/apex/apex_error_handling.hpp index a9748ac3..cae14c81 100644 --- a/src/apex/apex_error_handling.hpp +++ b/src/apex/apex_error_handling.hpp @@ -11,5 +11,8 @@ void apex_print_backtrace(); void apex_custom_signal_handler(int sig); int apex_register_signal_handler(); +int apex_register_thread_cleanup(); +int apex_custom_signal_handler_thread_exit(int sig); void apex_test_signal_handler(); +void apex_signal_all_threads(); diff --git a/src/apex/apex_kokkos.cpp b/src/apex/apex_kokkos.cpp index df6ae085..5e8007b8 100644 --- a/src/apex/apex_kokkos.cpp +++ b/src/apex/apex_kokkos.cpp @@ -148,7 +148,7 @@ void kokkosp_init_library(int loadseq, uint64_t version, void kokkosp_finalize_library() { memory_mtx.lock(); if (memory_map().size() == 0) { - if (apex::apex::instance()->get_node_id() == 0) { + if (apex::apex::instance()->get_node_id() == 0 && apex::apex_options::use_verbose()) { std::cout << "No Kokkos allocation Leaks on rank 0!" << std::endl; } } else { diff --git a/src/apex/apex_kokkos_tuning.cpp b/src/apex/apex_kokkos_tuning.cpp index 5b034dfa..2258a85d 100644 --- a/src/apex/apex_kokkos_tuning.cpp +++ b/src/apex/apex_kokkos_tuning.cpp @@ -1225,7 +1225,7 @@ void kokkosp_request_values( if (!apex::apex_options::use_kokkos_tuning()) { return; } // first, get the current timer node in the task tree //auto tlt = apex::thread_instance::get_top_level_timer(); - auto tlt = apex::thread_instance::get_current_profiler(); + auto tlt = apex::thread_instance::instance().get_current_profiler(); std::string tree_node{"default"}; if (tlt != nullptr) { //tree_node = tlt->tt_ptr->tree_node->getName(); diff --git a/src/apex/apex_preload.cpp b/src/apex/apex_preload.cpp index 48f2b979..e3a87f38 100644 --- a/src/apex/apex_preload.cpp +++ b/src/apex/apex_preload.cpp @@ -51,15 +51,37 @@ static int (*main_real)(int, char**, char**); int apex_preload_main(int argc, char** argv, char** envp) { // FIRST! check to see if this is a bash script. if so, DO NOTHING size_t len{strlen(argv[0])}; - if (len > 4 && strncmp(argv[0] + (len - 4), "bash", 4) == 0) { + // little lambda for making sure we wrap the right executable + const auto validate_argv0 = [&](const char * needle){ + size_t needle_len{strlen(needle)}; + if (len > needle_len && + (strncmp(argv[0] + (len - needle_len), needle, needle_len)) == 0) { + fputs("apex: skipping ", stderr); + fputs(argv[0], stderr); + fputs("!\n", stderr); + return true; + } + fputs("apex: executing ", stderr); + fputs(argv[0], stderr); + fputs("!\n", stderr); + return false; + }; + if (validate_argv0("tclsh8.6")) { + return main_real(argc, argv, envp); + } + if (validate_argv0("bash")) { + return main_real(argc, argv, envp); + } + // Next! check to see if this is a [t]csh script. if so, DO NOTHING + else if (validate_argv0("csh")) { return main_real(argc, argv, envp); } - // FIRST! check to see if this is a [t]csh script. if so, DO NOTHING - if (len > 3 && strncmp(argv[0] + (len - 3), "csh", 3) == 0) { + // Then! check to see if this is gdb. if so, DO NOTHING (should get caught by the apex_exec script though) + else if (validate_argv0("gdb")) { return main_real(argc, argv, envp); } - // FIRST! check to see if this is gdb. if so, DO NOTHING (should get caught by the apex_exec script though) - if (len > 3 && strncmp(argv[0] + (len - 3), "gdb", 3) == 0) { + // finally! check for a fork from cuda-gdb + else if (validate_argv0("NvDebugAgent")) { return main_real(argc, argv, envp); } // prevent re-entry diff --git a/src/apex/apex_types.h b/src/apex/apex_types.h index ab3f4e8c..bfa7a8f9 100644 --- a/src/apex/apex_types.h +++ b/src/apex/apex_types.h @@ -270,12 +270,12 @@ inline unsigned int sc_nprocessors_onln(void) macro (APEX_SUSPEND, suspend, bool, false, "Suspend APEX timers and counters during the application execution") \ macro (APEX_PAPI_SUSPEND, papi_suspend, bool, false, "Suspend PAPI counters during the application execution") \ macro (APEX_PROCESS_ASYNC_STATE, process_async_state, bool, true, "Enable/disable asynchronous processing of statistics (useful when only collecting trace data)") \ - macro (APEX_UNTIED_TIMERS, untied_timers, bool, false, "Disable callstack state maintenance for specific OS threads. This allows APEX timers to start on one thread and stop on another. This is not compatible with tracing.") \ macro (APEX_TAU, use_tau, bool, false, "Enable TAU profiling (if application is executed with tau_exec).") \ macro (APEX_OTF2, use_otf2, bool, false, "Enable OTF2 trace output.") \ macro (APEX_OTF2_COLLECTIVE_SIZE, otf2_collective_size, int, 1, "") \ - macro (APEX_TRACE_EVENT, use_trace_event, bool, false, "Enable Google Trace Event output. (deprecated, please use APEX_PERFETTO)") \ - macro (APEX_PERFETTO, use_perfetto, bool, false, "Enable Perfetto Trace output.") \ + macro (APEX_TRACE_EVENT, use_trace_event, bool, false, "Enable Google Trace Event output.") \ + macro (APEX_TRACE_THREAD_FLOW, use_thread_flow, bool, false, "Enable flow events between tasks on same thread.") \ + macro (APEX_PERFETTO, use_perfetto, bool, false, "Enable Perfetto Trace output (not recommended, use Google Trace Event instead).") \ macro (APEX_POLICY, use_policy, bool, true, "Enable APEX policy listener and execute registered policies.") \ macro (APEX_MEASURE_CONCURRENCY, use_concurrency, int, 0, "Periodically sample thread activity and output report at exit.") \ macro (APEX_MEASURE_CONCURRENCY_MAX_TIMERS, concurrency_max_timers, int, 5, "Maximum number of timers in the concurrency report.") \ diff --git a/src/apex/cupti_trace.cpp b/src/apex/cupti_trace.cpp index 571fcf6c..abbf3023 100644 --- a/src/apex/cupti_trace.cpp +++ b/src/apex/cupti_trace.cpp @@ -2066,7 +2066,14 @@ void apex_init_cuda_tracing() { if (!apex::apex_options::use_cuda()) { return; } apex_flush_cuda_tracing(); CUPTI_CALL(cuptiUnsubscribe(subscriber)); - CUPTI_CALL(cuptiFinalize()); + uint32_t version{0}; + CUPTI_CALL(cuptiGetVersion(&version)); + // cupti 12 introduced a bug that was fixed in version 12.4. + // see https://forums.developer.nvidia.com/t/cuda-profiler-tools-interface-cupti-for-cuda-toolkit-12-4-is-now-available/279799 + //printf("Cupti version: %d\n", version); + if (version < 18 || version > 21) { + CUPTI_CALL(cuptiFinalize()); + } // get_range_map().clear(); //std::cout << "* * * * * * * * EXITING CUPTI SUPPORT * * * * * * * * * *" << std::endl; allGood = false; diff --git a/src/apex/dependency_tree.cpp b/src/apex/dependency_tree.cpp index 19e7cbb2..9ee1debe 100644 --- a/src/apex/dependency_tree.cpp +++ b/src/apex/dependency_tree.cpp @@ -23,27 +23,34 @@ std::mutex Node::treeMutex; std::atomic Node::nodeCount{0}; std::set Node::known_metrics; -Node* Node::appendChild(task_identifier* c) { +std::shared_ptr Node::appendChild(task_identifier* c, std::shared_ptr existing) { treeMutex.lock(); auto iter = children.find(*c); if (iter == children.end()) { - auto n = new Node(c,this); - //std::cout << "Inserting " << c->get_name() << std::endl; - children.insert(std::make_pair(*c,n)); - treeMutex.unlock(); - return n; + if (existing != nullptr) { + existing->parents.push_back(shared_from_this()); + children.insert(std::make_pair(*c,existing)); + treeMutex.unlock(); + return existing; + } else { + auto n = std::make_shared(c,shared_from_this()); + //std::cout << "Inserting " << c->get_name() << std::endl; + children.insert(std::make_pair(*c,n)); + treeMutex.unlock(); + return n; + } } iter->second->count++; treeMutex.unlock(); return iter->second; } -Node* Node::replaceChild(task_identifier* old_child, task_identifier* new_child) { +std::shared_ptr Node::replaceChild(task_identifier* old_child, task_identifier* new_child) { treeMutex.lock(); auto olditer = children.find(*old_child); // not found? shouldn't happen... if (olditer == children.end()) { - auto n = new Node(new_child,this); + auto n = std::make_shared(new_child,shared_from_this()); //std::cout << "Inserting " << new_child->get_name() << std::endl; children.insert(std::make_pair(*new_child,n)); treeMutex.unlock(); @@ -57,7 +64,7 @@ Node* Node::replaceChild(task_identifier* old_child, task_identifier* new_child) auto newiter = children.find(*new_child); // not found? shouldn't happen... if (newiter == children.end()) { - auto n = new Node(new_child,this); + auto n = std::make_shared(new_child,shared_from_this()); //std::cout << "Inserting " << new_child->get_name() << std::endl; children.insert(std::make_pair(*new_child,n)); treeMutex.unlock(); @@ -68,11 +75,16 @@ Node* Node::replaceChild(task_identifier* old_child, task_identifier* new_child) } void Node::writeNode(std::ofstream& outfile, double total) { + static std::set> processed; + if (processed.count(shared_from_this())) return; + processed.insert(shared_from_this()); static size_t depth = 0; // Write out the relationships - if (parent != nullptr) { - outfile << " \"" << parent->getIndex() << "\" -> \"" << getIndex() << "\";"; - outfile << std::endl; + for(auto& parent : parents) { + if (parent != nullptr) { + outfile << " \"" << parent->getIndex() << "\" -> \"" << getIndex() << "\";"; + outfile << std::endl; + } } double acc = (data == task_identifier::get_main_task_id() || getAccumulated() == 0.0) ? @@ -115,7 +127,7 @@ void Node::writeNode(std::ofstream& outfile, double total) { depth--; } -bool cmp(std::pair& a, std::pair& b) { +bool cmp(std::pair>& a, std::pair>& b) { return a.second->getAccumulated() > b.second->getAccumulated(); } @@ -152,7 +164,7 @@ double Node::writeNodeASCII(std::ofstream& outfile, double total, size_t indent) outfile << std::endl; // sort the children by accumulated time - std::vector > sorted; + std::vector> > sorted; for (auto& it : children) { sorted.push_back(it); } @@ -337,13 +349,25 @@ void Node::addAccumulated(double value, double incl, bool is_resume, uint64_t th m.unlock(); } -double Node::writeNodeCSV(std::stringstream& outfile, double total, int node_id, int num_papi_counters) { +double Node::writeNodeCSV(std::stringstream& outfile, double total, int node_id, int num_papi_counters, bool topLevel) { static size_t depth = 0; + static std::set> processed; + // because dump can get called multiple times during execution, we need + // to reset the set of processed nodes. + if (topLevel) { processed.clear(); } + if (processed.count(shared_from_this())) return getAccumulated(); + processed.insert(shared_from_this()); APEX_ASSERT(total > 0.0); // write out the node id and graph node index and the name - outfile << node_id << "," << index << ","; - outfile << ((parent == nullptr) ? 0 : parent->index) << ","; - outfile << depth << ",\""; + outfile << node_id << "," << index << ",\"["; + std::string delim(""); + for (auto& parent : parents) { + if (parent != nullptr) { + outfile << delim << parent->index; + delim = ","; + } + } + outfile << "]\"," << depth << ",\""; outfile << data->get_tree_name() << "\","; // write out the accumulated double acc = (data == task_identifier::get_main_task_id() || getAccumulated() == 0.0) ? @@ -411,7 +435,7 @@ double Node::writeNodeCSV(std::stringstream& outfile, double total, int node_id, outfile << std::endl; // sort the children by name to make tree merging easier (I hope) - std::vector sorted; + std::vector> sorted; for (auto& it : children) { sorted.push_back(it.second); } diff --git a/src/apex/dependency_tree.hpp b/src/apex/dependency_tree.hpp index 4da9dc69..f1131350 100644 --- a/src/apex/dependency_tree.hpp +++ b/src/apex/dependency_tree.hpp @@ -45,10 +45,10 @@ class metricStorage { } }; -class Node { +class Node : public std::enable_shared_from_this { private: task_identifier* data; - Node* parent; + std::vector> parents; size_t count; apex_profile prof; //double calls; @@ -59,16 +59,17 @@ class Node { double inclusive; size_t index; std::set thread_ids; - std::unordered_map children; + std::unordered_map> children; // map for arbitrary metrics std::map metric_map; static std::mutex treeMutex; static std::atomic nodeCount; static std::set known_metrics; public: - Node(task_identifier* id, Node* p) : - data(id), parent(p), count(1), inclusive(0), + Node(task_identifier* id, std::shared_ptr p) : + data(id), count(1), inclusive(0), index(nodeCount.fetch_add(1, std::memory_order_relaxed)) { + parents.push_back(p); prof.calls = 0.0; prof.accumulated = 0.0; prof.minimum = 0.0; @@ -76,17 +77,11 @@ class Node { prof.sum_squares = 0.0; memset(prof.papi_metrics, 0, sizeof(double)*8); } - ~Node() { - treeMutex.lock(); - for (auto c : children) { - delete c.second; - } - treeMutex.unlock(); - } - Node* appendChild(task_identifier* c); - Node* replaceChild(task_identifier* old_child, task_identifier* new_child); + // nothing to destruct? because we used smart pointers for parents/children... + ~Node() { } + std::shared_ptr appendChild(task_identifier* c, std::shared_ptr existing); + std::shared_ptr replaceChild(task_identifier* old_child, task_identifier* new_child); task_identifier* getData() { return data; } - Node* getParent() { return parent; } size_t getCount() { return count; } inline double& getCalls() { return prof.calls; } inline double& getAccumulated() { return prof.accumulated; } @@ -100,7 +95,7 @@ class Node { std::string getName() const { return data->get_name(); }; void writeNode(std::ofstream& outfile, double total); double writeNodeASCII(std::ofstream& outfile, double total, size_t indent); - double writeNodeCSV(std::stringstream& outfile, double total, int node_id, int num_papi_counters); + double writeNodeCSV(std::stringstream& outfile, double total, int node_id, int num_papi_counters, bool topLevel = false); double writeNodeJSON(std::ofstream& outfile, double total, size_t indent); void writeTAUCallpath(std::ofstream& outfile, std::string prefix); static size_t getNodeCount() { @@ -111,8 +106,10 @@ class Node { return known_metrics; } // required for using this class as a key in a map, vector, etc. - static bool compareNodeByParentName (const Node* lhs, const Node* rhs) { - if (lhs->parent->index < rhs->parent->index) { + static bool compareNodeByParentName (const std::shared_ptr lhs, const std::shared_ptr rhs) { + if (lhs == nullptr) return true; + if (rhs == nullptr) return false; + if (lhs->parents[0]->index < rhs->parents[0]->index) { return true; } if (lhs->getName().compare(rhs->getName()) < 0) { diff --git a/src/apex/memory_wrapper.cpp b/src/apex/memory_wrapper.cpp index e9b03199..0d26f184 100644 --- a/src/apex/memory_wrapper.cpp +++ b/src/apex/memory_wrapper.cpp @@ -124,7 +124,7 @@ void recordAlloc(const size_t bytes, const void* ptr, static book_t& book = getBook(); double value = (double)(bytes); if (cpu) sample_value("Memory: Bytes Allocated", value, true); - profiler * p = thread_instance::instance().get_current_profiler(); + auto p = thread_instance::instance().get_current_profiler(); record_t tmp(value, thread_instance::instance().get_id(), alloc, cpu); if (p != nullptr) { tmp.id = p->get_task_id(); } //backtrace_record_t rec(3,tmp.backtrace); @@ -168,7 +168,7 @@ void recordFree(const void* ptr, const bool cpu) { double value = (double)(bytes); if (cpu) sample_value("Memory: Bytes Freed", value, true); book.totalAllocated.fetch_sub(bytes, std::memory_order_relaxed); - profiler * p = thread_instance::instance().get_current_profiler(); + auto p = thread_instance::instance().get_current_profiler(); if (p == nullptr) { auto i = apex::instance(); // might be after finalization, so double-check! @@ -186,7 +186,7 @@ void recordFree(const void* ptr, const bool cpu) { /* This doesn't belong here, but whatevs */ void recordMetric(std::string name, double value) { in_apex prevent_memory_tracking; - profiler * p = thread_instance::instance().get_current_profiler(); + auto p = thread_instance::instance().get_current_profiler(); if (p != nullptr) { p->metric_map[name] = value; } diff --git a/src/apex/otf2_listener.cpp b/src/apex/otf2_listener.cpp index df351d79..fa403a7e 100644 --- a/src/apex/otf2_listener.cpp +++ b/src/apex/otf2_listener.cpp @@ -1228,7 +1228,9 @@ namespace apex { OTF2_AttributeList * al = OTF2_AttributeList_New(); // create an attribute OTF2_AttributeList_AddUint64( al, 0, tt_ptr->guid ); - OTF2_AttributeList_AddUint64( al, 1, tt_ptr->parent_guid ); + for (auto& parent : tt_ptr->parents) { + OTF2_AttributeList_AddUint64( al, 1, parent->guid ); + } uint64_t idx = get_region_index(id); uint64_t stamp = 0L; if (thread_instance::get_id() == 0) { @@ -1283,7 +1285,9 @@ namespace apex { OTF2_AttributeList * al = OTF2_AttributeList_New(); // create an attribute OTF2_AttributeList_AddUint64( al, 0, p->tt_ptr->guid ); - OTF2_AttributeList_AddUint64( al, 1, p->tt_ptr->parent_guid ); + for (auto& parent : p->tt_ptr->parents) { + OTF2_AttributeList_AddUint64( al, 1, parent->guid ); + } // unfortunately, we can't use the timestamp from the // profiler object. bummer. it has to be taken after // the lock is acquired, so that events happen on @@ -2744,7 +2748,9 @@ namespace apex { OTF2_AttributeList * al = OTF2_AttributeList_New(); // create an attribute OTF2_AttributeList_AddUint64( al, 0, p->tt_ptr->guid ); - OTF2_AttributeList_AddUint64( al, 1, p->tt_ptr->parent_guid ); + for (auto& parent : p->tt_ptr->parents) { + OTF2_AttributeList_AddUint64( al, 1, parent->guid ); + } OTF2_EC(OTF2_EvtWriter_Enter( local_evt_writer, al, stamp, idx /* region */ )); stamp = p->get_stop_ns() - globalOffset; diff --git a/src/apex/perfetto_listener.cpp b/src/apex/perfetto_listener.cpp index 78182e8d..0debb6ed 100644 --- a/src/apex/perfetto_listener.cpp +++ b/src/apex/perfetto_listener.cpp @@ -124,12 +124,16 @@ void perfetto_listener::on_exit_thread(event_data &data) { inline bool perfetto_listener::_common_start(std::shared_ptr &tt_ptr) { APEX_UNUSED(tt_ptr); + uint64_t pguid = tt_ptr->guid; + if (tt_ptr->parents[0] != nullptr) { + pguid = tt_ptr->parents[0]->guid; + } TRACE_EVENT_BEGIN(_category, perfetto::DynamicString{tt_ptr->get_task_id()->get_name()}, //perfetto::ProcessTrack::Current(), (uint64_t)tt_ptr->prof->get_start_ns(), _guid, tt_ptr->guid, - _pguid, tt_ptr->parent_guid); + _pguid, pguid); return true; } @@ -202,8 +206,8 @@ void perfetto_listener::on_async_event(base_thread_node &node, std::shared_ptr &p, const async_event_data& data) { const size_t tid{make_tid(node)}; uint64_t pguid = 0; - if (p->tt_ptr != nullptr && p->tt_ptr->parent != nullptr) { - pguid = p->tt_ptr->parent->guid; + if (p->tt_ptr != nullptr && p->tt_ptr->parents[0] != nullptr) { + pguid = p->tt_ptr->parents[0]->guid; } TRACE_EVENT_BEGIN(_category, perfetto::DynamicString{p->get_task_id()->get_name()}, diff --git a/src/apex/profile_reducer.cpp b/src/apex/profile_reducer.cpp index 32573359..d5123efd 100644 --- a/src/apex/profile_reducer.cpp +++ b/src/apex/profile_reducer.cpp @@ -133,10 +133,12 @@ std::map reduce_profiles_for_screen() { if (mpi_initialized && commsize > 1) { MPI_CALL(PMPI_Allgather(sbuf, sbuf_length, MPI_CHAR, rbuf, sbuf_length, MPI_CHAR, MPI_COMM_WORLD)); + free(sbuf); } else { #else if (true) { #endif + // free the unused receive buffer free(rbuf); rbuf = sbuf; } @@ -149,6 +151,7 @@ std::map reduce_profiles_for_screen() { //DEBUG_PRINT("%d Inserting: %s\n", commrank, tmp.c_str()); } } + free(rbuf); // the set is already sorted?... //sort(all_names.begin(), all_names.end()); @@ -207,11 +210,15 @@ std::map reduce_profiles_for_screen() { if (mpi_initialized && commsize > 1) { MPI_CALL(PMPI_Gather(s_pdata, sbuf_length, MPI_DOUBLE, r_pdata, sbuf_length, MPI_DOUBLE, 0, MPI_COMM_WORLD)); + // free the send buffer + free(s_pdata); } else { #else if (true) { #endif + // free the unused "receive" buffer free(r_pdata); + // point to the send buffer r_pdata = s_pdata; } @@ -265,6 +272,8 @@ std::map reduce_profiles_for_screen() { dptr = &(dptr[num_fields]); } } + // free the buffer, receive buffer if MPI, send if not + free (r_pdata); } #if defined(APEX_WITH_MPI) || \ diff --git a/src/apex/profiler.hpp b/src/apex/profiler.hpp index 4f35ae9b..3ff6a1fb 100644 --- a/src/apex/profiler.hpp +++ b/src/apex/profiler.hpp @@ -38,11 +38,12 @@ class disabled_profiler_exception : public std::exception { } }; -class profiler { +class profiler : public std::enable_shared_from_this { private: task_identifier * task_id; // for counters, timers public: std::shared_ptr tt_ptr; // for timers + profiler* untied_parent; // for timer stack handling with untied timers uint64_t start_ns; uint64_t end_ns; #if APEX_HAVE_PAPI @@ -61,7 +62,7 @@ class profiler { reset_type is_reset; bool stopped; // needed for correct Hatchet output - uint64_t thread_id; + uint64_t thread_id; // saved at timer start std::map metric_map; task_identifier * get_task_id(void) { return task_id; @@ -75,6 +76,7 @@ class profiler { reset_type reset = reset_type::NONE) : task_id(task->get_task_id()), tt_ptr(task), + untied_parent(nullptr), start_ns(our_clock::now_ns()), #if APEX_HAVE_PAPI papi_start_values{0,0,0,0,0,0,0,0}, @@ -86,7 +88,9 @@ class profiler { guid(task->guid), is_counter(false), is_resume(resume), - is_reset(reset), stopped(false) { + is_reset(reset), stopped(false), + thread_id(task->thread_id) { + //printf("constructor! %p\n", this); fflush(stdout); task->prof = this; task->start_ns = start_ns; } @@ -96,6 +100,7 @@ class profiler { reset_type reset = reset_type::NONE) : task_id(id), tt_ptr(nullptr), + untied_parent(nullptr), start_ns(our_clock::now_ns()), #if APEX_HAVE_PAPI papi_start_values{0,0,0,0,0,0,0,0}, @@ -125,6 +130,7 @@ class profiler { is_reset(reset_type::NONE), stopped(true) { }; //copy constructor profiler(const profiler& in) : + std::enable_shared_from_this(in), task_id(in.task_id), tt_ptr(in.tt_ptr), start_ns(in.start_ns), @@ -142,7 +148,7 @@ class profiler { stopped(in.stopped), thread_id(in.thread_id) { - //printf("COPY!\n"); fflush(stdout); + //printf("COPY! %p -> %p\n", &in, this); fflush(stdout); #if APEX_HAVE_PAPI for (int i = 0 ; i < 8 ; i++) { papi_start_values[i] = in.papi_start_values[i]; @@ -150,7 +156,9 @@ class profiler { } #endif } - ~profiler(void) { /* not much to do here. */ }; + ~profiler(void) { /* not much to do here. */ + //printf("destructor! %p\n", this); fflush(stdout); + }; // for "yield" support void set_start(uint64_t timestamp) { start_ns = timestamp; @@ -272,6 +280,7 @@ class profiler { return start_ns - get_global_start(); } } + std::shared_ptr Get() {return shared_from_this();} }; } diff --git a/src/apex/profiler_listener.cpp b/src/apex/profiler_listener.cpp index 61398822..fdcb9411 100644 --- a/src/apex/profiler_listener.cpp +++ b/src/apex/profiler_listener.cpp @@ -153,7 +153,7 @@ std::unordered_set free_profiles; double non_idle_time = 0.0; /* Iterate over all timers and accumulate the time spent in them */ unordered_map::const_iterator it2; - std::unique_lock task_map_lock(_task_map_mutex); + std::lock_guard task_map_lock(_task_map_mutex); for(it2 = task_map.begin(); it2 != task_map.end(); it2++) { profile * p = it2->second; if (apex_options::throttle_timers()) { @@ -238,7 +238,7 @@ std::unordered_set free_profiles; } return theprofile; } - std::unique_lock task_map_lock(_task_map_mutex); + std::lock_guard task_map_lock(_task_map_mutex); unordered_map::const_iterator it = task_map.find(id); if (it != task_map.end()) { return (*it).second; @@ -247,7 +247,7 @@ std::unordered_set free_profiles; } void profiler_listener::reset_all(void) { - std::unique_lock task_map_lock(_task_map_mutex); + std::lock_guard task_map_lock(_task_map_mutex); for(auto &it : task_map) { it.second->reset(); } @@ -289,13 +289,13 @@ std::unordered_set free_profiles; } } #endif - std::unique_lock task_map_lock(_task_map_mutex); + _task_map_mutex.lock(); unordered_map::const_iterator it = task_map.find(*(p.get_task_id())); if (it != task_map.end()) { // A profile for this ID already exists. theprofile = (*it).second; - task_map_lock.unlock(); + _task_map_mutex.unlock(); if(p.is_reset == reset_type::CURRENT) { theprofile->reset(); } else { @@ -360,7 +360,7 @@ std::unordered_set free_profiles; p.is_counter ? APEX_COUNTER : APEX_TIMER); task_map[*(p.get_task_id())] = theprofile; } - task_map_lock.unlock(); + _task_map_mutex.unlock(); #ifdef APEX_HAVE_HPX #ifdef APEX_REGISTER_HPX3_COUNTERS if(!_done) { @@ -394,7 +394,7 @@ std::unordered_set free_profiles; /* before calling p.get_task_id()->get_name(), make sure we create * a thread_instance object that is NOT a worker. */ thread_instance::instance(false); - std::unique_lock task_map_lock(_mtx); + std::lock_guard task_map_lock(_mtx); task_scatterplot_samples << std::fixed << std::setprecision(0) << p.normalized_timestamp() << " " << p.elapsed() << " " @@ -408,7 +408,7 @@ std::unordered_set free_profiles; } } else { thread_instance::instance(false); - std::unique_lock task_map_lock(_mtx); + std::lock_guard task_map_lock(_mtx); counter_scatterplot_samples << std::fixed << std::setprecision(0) #ifdef APEX_HAVE_PROC @@ -468,7 +468,7 @@ std::unordered_set free_profiles; void profiler_listener::delete_profiles(void) { // iterate over the map and free the objects in the map unordered_map::const_iterator it; - std::unique_lock task_map_lock(_task_map_mutex); + std::lock_guard task_map_lock(_task_map_mutex); for(it = task_map.begin(); it != task_map.end(); it++) { delete it->second; } @@ -711,7 +711,7 @@ std::unordered_set free_profiles; #endif // wait for profiles to update std::this_thread::sleep_for(std::chrono::microseconds(100)); - total_time = get_profile(main_id); + total_time = get_profile(*main_id); } #endif // APEX_SYNCHRONOUS_PROCESSING double wall_clock_main = (total_time != nullptr) ? total_time->get_accumulated_seconds() : 0.0; @@ -746,7 +746,7 @@ std::unordered_set free_profiles; std::vector id_vector; // iterate over the counters, and sort their names { - std::unique_lock task_map_lock(_task_map_mutex); + std::lock_guard task_map_lock(_task_map_mutex); for(auto it2 : all_profiles) { std::string name = it2.first; apex_profile * p = it2.second; @@ -1031,7 +1031,7 @@ std::unordered_set free_profiles; // output nodes with "main" [shape=box; style=filled; fillcolor="#ff0000" ]; unordered_map::const_iterator it; - std::unique_lock task_map_lock(_task_map_mutex); + std::lock_guard task_map_lock(_task_map_mutex); for(it = task_map.begin(); it != task_map.end(); it++) { profile * p = it->second; // shouldn't happen, but? @@ -1217,7 +1217,7 @@ std::unordered_set free_profiles; header_stream << "\n"; //} stringstream tree_stream; - root->tree_node->writeNodeCSV(tree_stream, wall_clock_main, node_id, num_papi_counters); + root->tree_node->writeNodeCSV(tree_stream, wall_clock_main, node_id, num_papi_counters, true); std::string filename{"apex_tasktree.csv"}; reduce_profiles(header_stream, tree_stream, filename, false); } @@ -1239,19 +1239,20 @@ std::unordered_set free_profiles; // Determine number of counter events, as these need to be // excluded from the number of normal timers unordered_map::const_iterator it2; + size_t function_count = 0; { - std::unique_lock task_map_lock(_task_map_mutex); + std::lock_guard task_map_lock(_task_map_mutex); for(it2 = task_map.begin(); it2 != task_map.end(); it2++) { profile * p = it2->second; if(p->get_type() == APEX_COUNTER) { counter_events++; } } - } - size_t function_count = task_map.size() - counter_events; - if (apex_options::use_tasktree_output() || apex_options::use_hatchet_output()) { - auto root = task_wrapper::get_apex_main_wrapper(); - function_count += (root->tree_node->getNodeCount() - 1); + function_count = task_map.size() - counter_events; + if (apex_options::use_tasktree_output() || apex_options::use_hatchet_output()) { + auto root = task_wrapper::get_apex_main_wrapper(); + function_count += (root->tree_node->getNodeCount() - 1); + } } // Print the normal timers to the profile file @@ -1266,7 +1267,7 @@ std::unordered_set free_profiles; profile * mainp = nullptr; double not_main = 0.0; { - std::unique_lock task_map_lock(_task_map_mutex); + std::lock_guard task_map_lock(_task_map_mutex); for(it2 = task_map.begin(); it2 != task_map.end(); it2++) { profile * p = it2->second; task_identifier task_id = it2->first; @@ -1301,6 +1302,7 @@ std::unordered_set free_profiles; if(counter_events > 0) { myfile << counter_events << " userevents" << endl; myfile << "# eventname numevents max min mean sumsqr" << endl; + std::lock_guard task_map_lock(_task_map_mutex); for(it2 = task_map.begin(); it2 != task_map.end(); it2++) { profile * p = it2->second; if(p->get_type() == APEX_COUNTER) { @@ -1784,6 +1786,10 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl if (apex_options::process_async_state()) { finalize_profiles(data, reduced); } + for (auto itr : reduced) { + free(itr.second); + } + reduced.clear(); } if (apex_options::use_taskgraph_output()) { @@ -1895,8 +1901,10 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl //std::shared_ptr p = std::make_shared(tt_ptr, //is_resume); // get the right task identifier, based on whether there are aliases - profiler * p = new profiler(tt_ptr, is_resume); + profiler* p = new profiler(tt_ptr, is_resume); + //std::shared_ptr p = std::make_shared(p_prime); p->thread_id = _pls.my_tid; + APEX_ASSERT(p->thread_id == (unsigned int)thread_instance::get_id()); p->guid = tt_ptr->guid; thread_instance::instance().set_current_profiler(p); #if APEX_HAVE_PAPI @@ -2053,8 +2061,8 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl // get the right task identifier, based on whether there are aliases task_identifier * id = tt_ptr->get_task_id(); // if the parent task is not null, use it (obviously) - if (tt_ptr->parent != nullptr) { - task_identifier * pid = tt_ptr->parent->get_task_id(); + for (auto& parent : tt_ptr->parents) { + task_identifier * pid = parent->get_task_id(); dependency_queue()->enqueue(new task_dependency(pid, id)); return; } @@ -2165,18 +2173,18 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl } void profiler_listener::stop_main_timer(void) { - if (!_main_timer_stopped) { + if (!_main_timer_stopped && !main_timer->stopped) { APEX_ASSERT(main_timer != nullptr); main_timer->stop(true); //_common_stop(main_timer, false); //on_task_complete(main_timer->tt_ptr); _main_timer_stopped = true; + push_profiler((unsigned int)thread_instance::get_id(), *main_timer); } } void profiler_listener::on_pre_shutdown(void) { stop_main_timer(); - push_profiler((unsigned int)thread_instance::get_id(), *main_timer); } void profiler_listener::push_profiler_public(std::shared_ptr &p) { diff --git a/src/apex/profiler_listener.hpp b/src/apex/profiler_listener.hpp index 7f4c141c..b481e0b3 100644 --- a/src/apex/profiler_listener.hpp +++ b/src/apex/profiler_listener.hpp @@ -230,14 +230,13 @@ class profiler_listener : public event_listener { profile * get_idle_rate(void); std::vector& get_available_profiles() { static std::vector ids; - _task_map_mutex.lock(); + std::lock_guard lock(_task_map_mutex); if (task_map.size() > ids.size()) { ids.clear(); for (auto kv : task_map) { ids.push_back(kv.first); } } - _task_map_mutex.unlock(); return ids; } void process_profiles(void); diff --git a/src/apex/task_wrapper.hpp b/src/apex/task_wrapper.hpp index eb06a995..11f8d737 100644 --- a/src/apex/task_wrapper.hpp +++ b/src/apex/task_wrapper.hpp @@ -35,6 +35,12 @@ struct task_wrapper : public hpx::util::external_timer::task_wrapper { #else struct task_wrapper { #endif + typedef enum e_task_state { + CREATED, + RUNNING, + YIELDED, + STOPPED + } task_state_t; /** \brief A pointer to the task_identifier for this task_wrapper. */ @@ -47,18 +53,14 @@ struct task_wrapper { \brief An internally generated GUID for this task. */ uint64_t guid; -/** - \brief An internally generated GUID for the parent task of this task. - */ - uint64_t parent_guid; /** \brief A managed pointer to the parent task_wrapper for this task. */ - std::shared_ptr parent; + std::vector> parents; /** \brief A node in the task tree representing this task type */ - dependency::Node* tree_node; + std::shared_ptr tree_node; /** \brief Internal usage, used to manage HPX direct actions when their parent task is yielded by the runtime. @@ -86,6 +88,7 @@ struct task_wrapper { \brief Whether this event requires separate start/end events in gtrace */ bool explicit_trace_start; + task_state_t state; /** \brief Constructor. */ @@ -93,14 +96,21 @@ struct task_wrapper { task_id(nullptr), prof(nullptr), guid(0ull), - parent_guid(0ull), - parent(nullptr), + parents({}), tree_node(nullptr), alias(nullptr), thread_id(0UL), create_ns(our_clock::now_ns()), - explicit_trace_start(false) + explicit_trace_start(false), + state(CREATED) { } +/** + \brief Destructor. + */ + ~task_wrapper(void) { + // don't delete the alias, because task_identifier objects never go away + } + /** \brief Get the task_identifier for this task_wrapper. \returns A pointer to the task_identifier @@ -124,7 +134,8 @@ struct task_wrapper { const std::string apex_main_str(APEX_MAIN_STR); tt_ptr = std::make_shared(); tt_ptr->task_id = task_identifier::get_task_id(apex_main_str); - tt_ptr->tree_node = new dependency::Node(tt_ptr->task_id, nullptr); + std::shared_ptr tmp(new dependency::Node(tt_ptr->task_id, nullptr)); + tt_ptr->tree_node = tmp; } mtx.unlock(); } @@ -132,11 +143,17 @@ struct task_wrapper { } void assign_heritage() { // make/find a node for ourselves - tree_node = parent->tree_node->appendChild(task_id); + //tree_node = parents[0]->tree_node->appendChild(task_id); + for(auto& parent : parents) { + tree_node = parent->tree_node->appendChild(task_id, tree_node); + } } void update_heritage() { // make/find a node for ourselves - tree_node = parent->tree_node->replaceChild(task_id, alias); + //tree_node = parents[0]->tree_node->replaceChild(task_id, alias); + for(auto& parent : parents) { + tree_node = parent->tree_node->replaceChild(task_id, alias); + } } double get_create_us() { return double(create_ns) * 1.0e-3; diff --git a/src/apex/taskstubs_implementation.cpp b/src/apex/taskstubs_implementation.cpp index f6493630..f98f8f7a 100644 --- a/src/apex/taskstubs_implementation.cpp +++ b/src/apex/taskstubs_implementation.cpp @@ -12,10 +12,17 @@ #include "thread_instance.hpp" #include #include +#include +#include + using maptype = std::unordered_map>; -std::mutex mtx; + +std::mutex& mtx(void) { + static std::mutex mtx; + return mtx; +} maptype& getCommonMap(void) { static maptype theMap; @@ -27,47 +34,94 @@ maptype& getMyMap(void) { return theMap; } +void verbosePrint(const char *format, ...) +{ + static std::mutex local_mtx; + std::scoped_lock lock{local_mtx}; + va_list args; + va_start(args, format); + vprintf(format, args); + va_end(args); +} + +#define VERBOSE_PRINTF(...) if (apex::apex_options::use_verbose()) { verbosePrint(__VA_ARGS__); } + +void safePrint(const char * format, tasktimer_guid_t guid, const char * event) { + //static std::mutex local_mtx; + //std::scoped_lock lock{local_mtx}; + VERBOSE_PRINTF("%lu TS: %s: GUID %p %s\n", apex::thread_instance::get_id(), format, guid, event); + return; +} + void safeInsert( tasktimer_guid_t guid, std::shared_ptr task) { - mtx.lock(); - getCommonMap()[guid] = task; - mtx.unlock(); +#if 0 + { + std::scoped_lock lock{mtx()}; + getCommonMap()[guid] = task; + } getMyMap()[guid] = task; +#else + std::scoped_lock lock{mtx()}; + getCommonMap()[guid] = task; +#endif + //safePrint("Inserted", guid); } std::shared_ptr safeLookup( - tasktimer_guid_t guid) { + tasktimer_guid_t guid, const char * event) { +#if 0 // in the thread local map? auto task = getMyMap().find(guid); if (task == getMyMap().end()) { // in the common map? - std::scoped_lock lock{mtx}; - task = getCommonMap().find(guid); + { + std::scoped_lock lock{mtx()}; + task = getCommonMap().find(guid); + } if (task == getCommonMap().end()) { + safePrint("Not found", guid); + APEX_ASSERT(false); return nullptr; } getMyMap()[guid] = task->second; } +#else + std::scoped_lock lock{mtx()}; + auto task = getCommonMap().find(guid); + if (task == getCommonMap().end()) { + safePrint("Not found", guid, event); + //APEX_ASSERT(false); + return nullptr; + } + #endif + //safePrint("Found", guid); return task->second; } void safeErase( tasktimer_guid_t guid) { + return; + /* getMyMap().erase(guid); - mtx.lock(); - getCommonMap().erase(guid); - mtx.unlock(); + { + std::scoped_lock lock{mtx()}; + getCommonMap().erase(guid); + } + //safePrint("Destroyed", guid); + */ } extern "C" { // library function declarations void tasktimer_initialize_impl(void) { apex::init("PerfStubs API", 0, 1); + apex::apex_options::use_thread_flow(true); } void tasktimer_finalize_impl(void) { /* Debatable whether we want to do this finalize */ - apex::finalize(); + //apex::finalize(); } // measurement function declarations tasktimer_timer_t tasktimer_create_impl( @@ -76,21 +130,24 @@ extern "C" { const tasktimer_guid_t timer_guid, const tasktimer_guid_t* parent_guids, const uint64_t parent_count) { - // TODO: need to handle multiple parents! + static bool& over = apex::get_program_over(); + if (over) return nullptr; + safePrint("Creating", timer_guid, timer_name); + // need to look up the parent shared pointers? std::vector> parent_tasks; for (uint64_t i = 0 ; i < parent_count ; i++) { - auto tmp = safeLookup(parent_guids[i]); + auto tmp = safeLookup(parent_guids[i], "parent lookup"); if (tmp != nullptr) parent_tasks.push_back(tmp); } // if no name, use address if (timer_name == nullptr || strlen(timer_name) == 0) { - // TODO: need to handle multiple parents! - if (parent_tasks.size() > 0) { + //VERBOSE_PRINTF("Null name for timer: %p\n", function_address); + if (parent_count > 0) { auto task = apex::new_task( (apex_function_address)function_address, - timer_guid, parent_tasks[0]); + timer_guid, parent_tasks); safeInsert(timer_guid, task); } else { auto task = apex::new_task( @@ -99,80 +156,200 @@ extern "C" { safeInsert(timer_guid, task); } } else { + std::string tmpname{timer_name}; + //tmpname += std::string(" "); + //tmpname += std::to_string(timer_guid); // TODO: need to handle multiple parents! if (parent_tasks.size() > 0) { - auto task = apex::new_task(timer_name, timer_guid, parent_tasks[0]); + auto task = apex::new_task(tmpname, timer_guid, parent_tasks); safeInsert(timer_guid, task); } else { - auto task = apex::new_task(timer_name, timer_guid); + auto task = apex::new_task(tmpname, timer_guid); safeInsert(timer_guid, task); } } return (tasktimer_timer_t)(timer_guid); } + +#define MAP_TASK(_timer, _apex_timer, _event) \ + static bool& over_{apex::get_program_over()}; \ + if (over_) return; \ + uint64_t _tmp = (uint64_t)(_timer); \ + auto _apex_timer = safeLookup(_tmp, _event); + void tasktimer_schedule_impl( tasktimer_timer_t timer, tasktimer_argument_value_p arguments, uint64_t argument_count) { + MAP_TASK(timer, apex_timer, "schedule"); + VERBOSE_PRINTF("%lu TS: Scheduling: %p %s\n", apex::thread_instance::get_id(), timer, + apex_timer == nullptr ? "unknown" : apex_timer->task_id->get_name().c_str()); + static bool& over = apex::get_program_over(); + if (over) return; // TODO: handle the schedule event, somehow APEX_UNUSED(timer); APEX_UNUSED(arguments); APEX_UNUSED(argument_count); } -#define MAP_TASK(_timer, _apex_timer) \ - uint64_t _tmp = (uint64_t)(_timer); \ - auto _apex_timer = safeLookup(_tmp); - void tasktimer_start_impl( tasktimer_timer_t timer, tasktimer_execution_space_p) { // TODO: capture the execution space, somehow...a new task? - MAP_TASK(timer, apex_timer); - apex::start(apex_timer); + MAP_TASK(timer, apex_timer, "start"); + VERBOSE_PRINTF("%lu TS: Starting: %p %s\n", apex::thread_instance::get_id(), timer, + apex_timer == nullptr ? "unknown" : apex_timer->task_id->get_name().c_str()); + if (apex_timer != nullptr) { + apex::start(apex_timer); + } } void tasktimer_yield_impl( tasktimer_timer_t timer) { - MAP_TASK(timer, apex_timer); - apex::yield(apex_timer); +#if 1 + static bool& over = apex::get_program_over(); + if (over) return; + MAP_TASK(timer, apex_timer, "yield"); + VERBOSE_PRINTF("%lu TS: Yielding: %p %s\n", apex::thread_instance::get_id(), timer, + apex_timer == nullptr ? "unknown" : apex_timer->task_id->get_name().c_str()); + if (apex_timer != nullptr) { + apex::yield(apex_timer); + } +#endif } void tasktimer_resume_impl( tasktimer_timer_t timer, tasktimer_execution_space_p) { +#if 1 // TODO: capture the execution space, somehow...a new task? - MAP_TASK(timer, apex_timer); + MAP_TASK(timer, apex_timer, "resume"); + VERBOSE_PRINTF("%lu TS: Resuming: %p %s\n", apex::thread_instance::get_id(), timer, + apex_timer == nullptr ? "unknown" : apex_timer->task_id->get_name().c_str()); // TODO: why no resume function for task_wrapper objects? - apex::start(apex_timer); + if (apex_timer != nullptr) { + apex::resume(apex_timer); + } else { + APEX_ASSERT(false); + } +#endif } void tasktimer_stop_impl( tasktimer_timer_t timer) { - MAP_TASK(timer, apex_timer); - apex::stop(apex_timer); + static std::set stopped; + static std::mutex local_mtx; + MAP_TASK(timer, apex_timer, "stop"); + VERBOSE_PRINTF("%lu TS: Stopping: %p %s\n", apex::thread_instance::get_id(), timer, + apex_timer == nullptr ? "unknown" : apex_timer->task_id->get_name().c_str()); + if (apex_timer != nullptr) { + { + std::scoped_lock lock{local_mtx}; + if (stopped.count(timer) > 0) { + VERBOSE_PRINTF("%lu TS: ERROR! TIMER STOPPED TWICE! : %p\n", apex::thread_instance::get_id(), timer); + return; + } + stopped.insert(timer); + } + apex::stop(apex_timer); + } } void tasktimer_destroy_impl( tasktimer_timer_t timer) { - MAP_TASK(timer, apex_timer); - // TODO: need to handle the destroy event somehow. - // definitely need to remove it from the local map. - safeErase(apex_timer->guid); + MAP_TASK(timer, apex_timer, "destroy"); + VERBOSE_PRINTF("%lu TS: Destroying: %p %s\n", apex::thread_instance::get_id(), timer, + apex_timer == nullptr ? "unknown" : apex_timer->task_id->get_name().c_str()); + if (apex_timer != nullptr) { + //if (apex_timer->state == apex::task_wrapper::RUNNING) { apex::stop(apex_timer); } + // TODO: need to handle the destroy event somehow. + // definitely need to remove it from the local map. + safeErase(apex_timer->guid); + } } void tasktimer_add_parents_impl ( tasktimer_timer_t timer, const tasktimer_guid_t* parents, const uint64_t parent_count) { // TODO: need to handle the add parents event - MAP_TASK(timer, apex_timer); - APEX_UNUSED(apex_timer); - APEX_UNUSED(parents); - APEX_UNUSED(parent_count); + MAP_TASK(timer, apex_timer, "add parents"); + VERBOSE_PRINTF("%lu TS: Adding parents: %p %s\n", apex::thread_instance::get_id(), timer, + apex_timer == nullptr ? "unknown" : apex_timer->task_id->get_name().c_str()); + if (apex_timer != nullptr) { + for (uint64_t i = 0 ; i < parent_count ; i++) { + auto tmp = safeLookup(parents[i], "parent lookup"); + if (tmp != nullptr) { + // add the parent to the child + apex_timer->parents.push_back(tmp); + } + } + // update the child tree + if (apex::apex_options::use_tasktree_output() || + apex::apex_options::use_hatchet_output()) { + apex_timer->assign_heritage(); + } + } } void tasktimer_add_children_impl( tasktimer_timer_t timer, const tasktimer_guid_t* children, const uint64_t child_count) { // TODO: need to handle the add children event - MAP_TASK(timer, apex_timer); - APEX_UNUSED(apex_timer); - APEX_UNUSED(children); - APEX_UNUSED(child_count); + MAP_TASK(timer, apex_timer, "add children"); + VERBOSE_PRINTF("%lu TS: Adding children: %p %s\n", apex::thread_instance::get_id(), timer, + apex_timer == nullptr ? "unknown" : apex_timer->task_id->get_name().c_str()); + if (apex_timer != nullptr) { + for (uint64_t i = 0 ; i < child_count ; i++) { + auto tmp = safeLookup(children[i], "child lookup"); + if (tmp != nullptr) { + // add the parent to the child + tmp->parents.push_back(apex_timer); + // update the child tree + if (apex::apex_options::use_tasktree_output() || + apex::apex_options::use_hatchet_output()) { + tmp->assign_heritage(); + } + } + } + } + } + + void timerStack( + std::shared_ptr apex_timer, + bool start) { + static thread_local std::stack> theStack; + if (start) { + apex::start(apex_timer); + theStack.push(apex_timer); + } else { + auto timer = theStack.top(); + apex::stop(timer); + theStack.pop(); + } + } + + void tasktimer_data_transfer_start_impl( + tasktimer_guid_t guid, + tasktimer_execution_space_p source_type, + const char* source_name, + const void* source_ptr, + tasktimer_execution_space_p dest_type, + const char* dest_name, + const void* dest_ptr) { + std::shared_ptr parent = safeLookup(guid, "data transfer"); + auto task = apex::new_task("data xfer", 0, parent); + timerStack(task, true); + } + + void tasktimer_data_transfer_stop_impl(tasktimer_guid_t guid) { + timerStack(nullptr, false); + } + + void tasktimer_command_start_impl(const char* type_name) { + // we need to create a unique GUID for the command + static tasktimer_guid_t guid{UINT64_MAX/2}; + std::string tmpstr{type_name}; + auto task = apex::new_task(tmpstr, guid++); + timerStack(task, true); } + + void tasktimer_command_stop_impl(void) { + timerStack(nullptr, false); + } + } diff --git a/src/apex/thread_instance.cpp b/src/apex/thread_instance.cpp index 25e8bee1..06a358c6 100644 --- a/src/apex/thread_instance.cpp +++ b/src/apex/thread_instance.cpp @@ -62,10 +62,6 @@ map thread_instance::_worker_map; std::mutex thread_instance::_worker_map_mutex; // Global static path to executable name string * thread_instance::_program_path = nullptr; -// Global static unordered map of parent GUIDs to child GUIDs -// to handle "overlapping timer" problem. -std::unordered_map* > - thread_instance::_children_to_resume; */ #ifdef APEX_DEBUG // Global static mutex to control access for debugging purposes @@ -88,7 +84,7 @@ thread_instance::~thread_instance(void) { } std::cout << "thread " << _id << " exiting... " << __APEX_FUNCTION__ << std::endl; } - if (apex::get_program_over()) { return; } + if (get_program_over()) { return; } if (_top_level_timer != nullptr) { stop(_top_level_timer); _top_level_timer = nullptr; @@ -245,14 +241,9 @@ string thread_instance::map_addr_to_name(apex_function_address function_address) return _function_map[function_address]; } -void thread_instance::set_current_profiler(profiler * the_profiler) { - instance().current_profilers.push_back(the_profiler); - //printf("%lu pushing %s\n", get_id(), the_profiler->get_task_id()->get_short_name().c_str()); -} - -profiler * thread_instance::restore_children_profilers( +profiler* thread_instance::restore_children_profilers( std::shared_ptr &tt_ptr) { - profiler * parent = instance().get_current_profiler(); + profiler* parent = instance().get_current_profiler(); // if there are no children to restore, return. if (tt_ptr == nullptr || tt_ptr->data_ptr.size() == 0) {return parent;} // Get the vector of children that we stored @@ -264,7 +255,7 @@ profiler * thread_instance::restore_children_profilers( // make sure to set the current profiler - the profiler_listener // is bypassed by the resume method, above. It's the listener that // sets the current profiler when a timer is started - thread_instance::set_current_profiler((*myprof)); + //thread_instance::instance().set_current_profiler((*myprof)); } // clear the vector. myvec->clear(); @@ -272,59 +263,30 @@ profiler * thread_instance::restore_children_profilers( return parent; } -void thread_instance::clear_all_profilers() { - // nothing to do? - if (current_profilers.empty() || !_is_worker) return; - // copy the stack - auto the_stack(current_profilers); - auto tmp = the_stack.back(); - while (the_stack.size() > 0) { - /* Make a copy of the profiler object on the top of the stack. */ - profiler * profiler_copy = new profiler(*tmp); - /* Stop the copy, using a special internal function. */ - apex::stop_internal(profiler_copy); - // pop the child from the stack copy - the_stack.pop_back(); - if (the_stack.empty()) { return; } - // get the new top of the stack - tmp = the_stack.back(); - } -} - -void thread_instance::clear_current_profiler(profiler * the_profiler, +void thread_instance::clear_current_profiler( bool save_children, std::shared_ptr &tt_ptr) { - // this is a stack variable that provides safety when using recursion. static APEX_NATIVE_TLS bool fixing_stack = false; - // this is a serious problem... - if (instance().current_profilers.empty()) { - // unless...we happen to be exiting. Bets are off. - if (apex_options::suspend() == true) { return; } - if (apex_options::untied_timers() == true) { return; } - // if we've already cleared the stack on this thread, we're fine - if (instance()._exiting) { return; } - std::cerr << "APEX: Warning! empty profiler stack!!!\n"; - std::cerr << "If a profiler object was started on one OS thread "; - std::cerr << "and stopped/yielded on another, please run with the "; - std::cerr << "environment variable 'APEX_UNTIED_TIMERS=1' or "; - std::cerr << "use the C++ API call 'apex::untied_timers(true);' "; - std::cerr << "or C API call 'apex_set_untied_timers(1);'\n" << std::endl; - std::cerr << "Attempted to stop timer: " << the_profiler->get_task_id()->get_name(true) << std::endl; - APEX_ASSERT(false); - // redundant, but assert gets bypassed in a debug build. - abort(); - } // check for recursion if (fixing_stack) {return;} - // get the current stack of timers - auto &the_stack = instance().current_profilers; - auto tmp = the_stack.back(); - //printf("%lu popping %s\n", get_id(), tmp->get_task_id()->get_short_name().c_str()); - /* Uh-oh! Someone has caused the dreaded "overlapping timer" problem to - * happen! No problem - stop the child timer. - * Keep the children around, along with a reference to the parent's - * guid so that if/when we see this parent again, we can restart - * the children timers. */ - if (the_stack.size() > 1 && tmp != the_profiler) { + // get the current profiler + profiler* tmp = instance().current_profiler; + // get the task wrapper's profiler + profiler* the_profiler = tt_ptr->prof; + // This thread has no running timers, do nothing. + if (tmp == nullptr) { + //printf("Setting current profiler to nullptr\n"); + return; + } + // if this profiler was started somewhere else, do nothing. + if (the_profiler->thread_id != instance().get_id()) { + //printf("Doing nothing with current profiler\n"); + return; + } + // if the current profiler isn't this profiler, is it in the "stack"? + // we know the current profiler and the one we are stopping are + // on the same thread. Assume we are handling a "direct action" that was + // yielded. + if (tmp != the_profiler) { fixing_stack = true; // if the data pointer location isn't available, we can't support this runtime. // create a vector to store the children @@ -337,40 +299,38 @@ void thread_instance::clear_current_profiler(profiler * the_profiler, /* Make a copy of the profiler object on the top of the stack. */ profiler * profiler_copy = new profiler(*tmp); tt_ptr->data_ptr.push_back(tmp); - /* Stop the copy. The original will get reset when the + /* yield the copy. The original will get reset when the parent resumes. */ - stop(profiler_copy, false); // we better be re-entrant safe! + yield(profiler_copy); // we better be re-entrant safe! } else { // since we aren't yielding, just stop the children. stop(tmp); // we better be re-entrant safe! } - // pop the original child, we've saved it in the vector - the_stack.pop_back(); - // this is a serious problem... - if (the_stack.empty()) { + // this is a serious problem...or is it? no! + if (tmp->untied_parent == nullptr) { + /* // unless...we happen to be exiting. Bets are off. if (apex_options::suspend() == true) { return; } // if we've already cleared the stack on this thread, we're fine if (instance()._exiting) { return; } - std::cerr << "Warning! empty profiler stack!\n"; + std::cerr << "Warning! empty profiler stack!" << __LINE__ << "\n"; APEX_ASSERT(false); //abort(); + */ + instance().current_profiler = nullptr; + //printf("Setting current profiler to nullptr\n"); return; } // get the new top of the stack - tmp = the_stack.back(); + tmp = tmp->untied_parent; //printf("%lu popping? %s\n", get_id(), tmp->get_task_id()->get_short_name().c_str()); } // done with the stack, allow proper recursion again. fixing_stack = false; } - // pop this timer off the stack. - the_stack.pop_back(); -} - -profiler * thread_instance::get_current_profiler(void) { - if (instance().current_profilers.empty()) { return nullptr; } - return instance().current_profilers.back(); + //printf("%s Setting current profiler from %s to %s\n", __func__, instance().current_profiler->tt_ptr->task_id->get_name().c_str(), + //tmp->untied_parent == nullptr ? "nullptr" : tmp->untied_parent->tt_ptr->task_id->get_name().c_str()); + instance().current_profiler = tmp->untied_parent; } } diff --git a/src/apex/thread_instance.hpp b/src/apex/thread_instance.hpp index d8322d0d..ec43eb21 100644 --- a/src/apex/thread_instance.hpp +++ b/src/apex/thread_instance.hpp @@ -42,6 +42,7 @@ class common_data { public: // map from name to thread id - common to all threads std::map _name_map; + std::set _tids; std::mutex _name_map_mutex; shared_mutex_type _function_map_mutex; // map from thread id to is_worker @@ -51,7 +52,6 @@ class common_data { std::atomic_int _num_workers; std::atomic_int _active_threads; std::string * _program_path; - std::unordered_map* > _children_to_resume; }; class thread_instance { @@ -88,13 +88,13 @@ class thread_instance { static std::atomic_int _num_workers; static std::atomic_int _active_threads; static std::string * _program_path; - static std::unordered_map* > _children_to_resume; */ // constructor thread_instance (bool is_worker) : _id(-1), _id_reversed(UINTMAX_MAX), _runtime_id(-1), _top_level_timer_name(), _is_worker(is_worker), _task_count(0), - _top_level_timer(nullptr), _exiting(false) { + _top_level_timer(nullptr), _exiting(false), + current_profiler(nullptr) { /* Even do this for non-workers, because for CUPTI processing we need to * generate GUIDs for the activity events! */ _id = common()._num_threads++; @@ -114,6 +114,8 @@ class thread_instance { } std::cout << "thread " << _id << " starting... " << __APEX_FUNCTION__ << std::endl; } + std::lock_guard l{common()._name_map_mutex}; + common()._tids.insert(pthread_self()); }; // private default constructor thread_instance () = delete; @@ -123,7 +125,7 @@ class thread_instance { thread_instance& operator=(thread_instance const&)= delete; // map from function address to name - unique to all threads to avoid locking std::map _function_map; - std::vector current_profilers; + profiler* current_profiler; uint64_t _get_guid(void) { // start at 1, because 0 means nullptr which means "no parent" _task_count++; @@ -153,14 +155,16 @@ class thread_instance { static int get_num_threads(void) { return common()._num_threads; }; static int get_num_workers(void) { return common()._num_workers; }; std::string map_addr_to_name(apex_function_address function_address); - static profiler * restore_children_profilers(std::shared_ptr &tt_ptr); - static void set_current_profiler(profiler * the_profiler); - static profiler * get_current_profiler(void); - static void clear_current_profiler(profiler * the_profiler, - bool save_children, std::shared_ptr &tt_ptr); - static void clear_current_profiler() { - instance().current_profilers.pop_back(); + static profiler* restore_children_profilers(std::shared_ptr &tt_ptr); + void set_current_profiler(profiler* the_profiler) { + //printf("%s Setting current profiler from %s to %s\n", __func__, current_profiler == nullptr ? "nullptr" : current_profiler->tt_ptr->task_id->get_name().c_str(), the_profiler->tt_ptr->task_id->get_name().c_str()); + the_profiler->untied_parent = current_profiler; + current_profiler = the_profiler; } + profiler* get_current_profiler(void) { + return current_profiler; + } + static void clear_current_profiler(bool save_children, std::shared_ptr &tt_ptr); static const char * program_path(void); static bool is_worker() { return instance()._is_worker; } static uint64_t get_guid() { return instance()._get_guid(); } @@ -177,6 +181,10 @@ class thread_instance { instance(false)._exiting = true; } void clear_all_profilers(void); + static std::set gettids(void) { + std::lock_guard l{common()._name_map_mutex}; + return common()._tids; + } #ifdef APEX_DEBUG static std::mutex _open_profiler_mutex; diff --git a/src/apex/trace_event_listener.cpp b/src/apex/trace_event_listener.cpp index 5506e2cf..abacefca 100644 --- a/src/apex/trace_event_listener.cpp +++ b/src/apex/trace_event_listener.cpp @@ -103,16 +103,34 @@ void trace_event_listener::on_exit_thread(event_data &data) { return; } +inline std::string parents_to_string(std::shared_ptr tt_ptr) { + if (tt_ptr->parents.size() == 0) { + return std::string("0"); + } + if (tt_ptr->parents.size() == 1) { + APEX_ASSERT (tt_ptr->parents[0] != nullptr); + return std::to_string(tt_ptr->parents[0]->guid); + } + std::string parents{""}; + std::string delimiter{"["}; + for (auto& parent : tt_ptr->parents) { + if (parent != nullptr) { + parents += delimiter + std::to_string(parent->guid); + delimiter = ","; + } + } + parents += "]"; + return parents; +} + inline void trace_event_listener::_common_start(std::shared_ptr &tt_ptr) { static APEX_NATIVE_TLS long unsigned int tid = get_thread_id_metadata(); + long unsigned int _tid = tt_ptr->prof->thread_id; if (!_terminate) { std::stringstream ss; ss.precision(3); ss << fixed; - uint64_t pguid = 0; - if (tt_ptr->parent != nullptr) { - pguid = tt_ptr->parent->guid; - } + std::string pguid = parents_to_string(tt_ptr); ss << "{\"name\":\"" << tt_ptr->get_task_id()->get_name() << "\",\"cat\":\"CPU\"" << ",\"ph\":\"B\",\"pid\":" @@ -198,31 +216,31 @@ void write_flow_event(std::stringstream& ss, double ts, char ph, inline void trace_event_listener::_common_stop(std::shared_ptr &p) { static APEX_NATIVE_TLS long unsigned int tid = get_thread_id_metadata(); + static auto main_wrapper = task_wrapper::get_apex_main_wrapper(); // With HPX, the APEX MAIN timer will be stopped on a different thread // than the one that started it. So... make sure we get the right TID // But don't worry, the thread metadata will have been written at the // event start. - long unsigned int _tid = (p->tt_ptr->explicit_trace_start ? p->thread_id : tid); + //long unsigned int _tid = (p->tt_ptr->explicit_trace_start ? p->thread_id : tid); + long unsigned int _tid = p->thread_id; if (!_terminate) { std::stringstream ss; ss.precision(3); ss << fixed; - uint64_t pguid = 0; - if (p->tt_ptr != nullptr && p->tt_ptr->parent != nullptr) { - pguid = p->tt_ptr->parent->guid; - } // if the parent tid is not the same, create a flow event BEFORE the single event - if (p->tt_ptr->parent != nullptr + for (auto& parent : p->tt_ptr->parents) { + if (parent != nullptr && parent != main_wrapper #ifndef APEX_HAVE_HPX // ...except for HPX - make the flow event regardless - && p->tt_ptr->parent->thread_id != _tid + && (parent->thread_id != _tid || apex_options::use_thread_flow()) #endif ) { - //std::cout << "FLOWING!" << std::endl; - uint64_t flow_id = reversed_node_id + get_flow_id(); - write_flow_event(ss, p->tt_ptr->parent->get_flow_us()+0.25, 's', "ControlFlow", flow_id, - saved_node_id, p->tt_ptr->parent->thread_id, p->tt_ptr->parent->task_id->get_name(), p->get_task_id()->get_name()); - write_flow_event(ss, p->get_start_us()-0.25, 'f', "ControlFlow", flow_id, - saved_node_id, _tid, p->tt_ptr->parent->task_id->get_name(), p->get_task_id()->get_name()); + //std::cout << "FLOWING!" << std::endl; + uint64_t flow_id = reversed_node_id + get_flow_id(); + write_flow_event(ss, parent->get_flow_us()+0.25, 's', "ControlFlow", flow_id, + saved_node_id, parent->thread_id, "", ""); //parent->task_id->get_name(), p->get_task_id()->get_name()); + write_flow_event(ss, p->get_start_us()-0.25, 'f', "ControlFlow", flow_id, + saved_node_id, _tid, "", ""); //parent->task_id->get_name(), p->get_task_id()->get_name()); + } } if (p->tt_ptr->explicit_trace_start) { ss << "{\"name\":\"" << p->get_task_id()->get_name() @@ -232,6 +250,7 @@ inline void trace_event_listener::_common_stop(std::shared_ptr &p) { << ",\"ts\":" << p->get_stop_us() << "},\n"; } else { + std::string pguid = parents_to_string(p->tt_ptr); ss << "{\"name\":\"" << p->get_task_id()->get_name() << "\",\"cat\":\"CPU\"" << ",\"ph\":\"X\",\"pid\":" @@ -361,10 +380,7 @@ void trace_event_listener::on_async_event(base_thread_node &node, ss.precision(3); ss << fixed; std::string tid{make_tid(node)}; - uint64_t pguid = 0; - if (p->tt_ptr != nullptr && p->tt_ptr->parent != nullptr) { - pguid = p->tt_ptr->parent->guid; - } + std::string pguid = parents_to_string(p->tt_ptr); ss << "{\"name\":\"" << p->get_task_id()->get_name() << "\",\"cat\":\"GPU\"" << ",\"ph\":\"X\",\"pid\":" diff --git a/src/apex/tree.cpp b/src/apex/tree.cpp index 92bfc83c..fd45d394 100644 --- a/src/apex/tree.cpp +++ b/src/apex/tree.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include "apex_assert.h" namespace apex { @@ -15,9 +16,13 @@ std::atomic node::count{0}; node * node::buildTree(std::vector>& rows, node * root) { if(rows.size() == 0) return root; + // process (mpi rank) //size_t rank = stol(rows[0][0]); + // node (tree node) size_t index = stol(rows[0][1]); - size_t pindex = stol(rows[0][2]); + // parent (parent tree node) + //size_t pindex = stol(rows[0][2]); + // depth //size_t depth = stol(rows[0][3]); std::string name = rows[0][4]; std::unordered_map node_map; @@ -30,20 +35,27 @@ node * node::buildTree(std::vector>& rows, for(size_t i = 1 ; i < rows.size() ; i++) { //rank = stol(rows[i][0]); index = stol(rows[i][1]); - pindex = stol(rows[i][2]); + std::string tmpstr(rows[i][2]); + std::stringstream parents(tmpstr); + size_t pindex; //depth = stol(rows[i][3]); name = rows[i][4]; + while(true) { + parents >> pindex; + if (!parents) break; + std::cout << index << " Found parent: " << pindex << std::endl; /* std::cout << rank << " " << index << " " << pindex << " " << depth << " " << name << std::endl; */ - APEX_ASSERT(node_map.count(pindex) > 0); - auto parent = node_map.at(pindex); - node * newChild = parent->addChild(name); - node_map.insert(std::pair(index,newChild)); - // update the row data with the tree data - rows[i][1] = std::to_string(newChild->_index); - rows[i][2] = std::to_string(parent->_index); + APEX_ASSERT(node_map.count(pindex) > 0); + auto parent = node_map.at(pindex); + node * newChild = parent->addChild(name); + node_map.insert(std::pair(index,newChild)); + // update the row data with the tree data + rows[i][1] = std::to_string(newChild->_index); + rows[i][2] = std::to_string(parent->_index); + } } std::mutex foo; std::scoped_lock{foo}; diff --git a/src/apex/utils.cpp b/src/apex/utils.cpp index 8f580518..4f8da121 100644 --- a/src/apex/utils.cpp +++ b/src/apex/utils.cpp @@ -779,9 +779,6 @@ std::string getCommandLine(void) { #else return proc_data_reader::get_command_line(); #endif - // just in case things failed - std::string tmp{"unknown"}; - return tmp; } } // namespace apex diff --git a/src/scripts/apex-treesummary.py b/src/scripts/apex-treesummary.py index bd519294..d4bb75de 100755 --- a/src/scripts/apex-treesummary.py +++ b/src/scripts/apex-treesummary.py @@ -5,6 +5,7 @@ import argparse from argparse import RawTextHelpFormatter import math +import matplotlib as mp import os import re @@ -66,11 +67,15 @@ def parseArgs(): return args nodeIndex = 0 +printedIndexes = set() class TreeNode: def __init__(self, name, df): global nodeIndex self.name = name - self.index = nodeIndex + if df.empty: + self.index = None #nodeIndex + else: + self.index = df['node index'].iloc[0] #nodeIndex nodeIndex = nodeIndex + 1 self.children = {} self.df = df @@ -98,7 +103,7 @@ def addChild(self, name, df): def print(self, depth, total, maxranks): tmpstr = str() acc_mean = 0.0 - if not self.df.empty: + if (not self.df.empty) and (not self.index in printedIndexes): metric = 'total time(s)' rows = str(len(self.df.index)) tmpstr = tmpstr + rows.rjust(len(str(maxranks)), ' ') @@ -126,12 +131,13 @@ def print(self, depth, total, maxranks): tmpstr = tmpstr + ', mean=' + '%.3f' % acc_mean_per_call tmpstr = tmpstr + ', threads=' + str(int(acc_threads)) tmpstr = tmpstr + '} ' + self.name + '\n' + printedIndexes.add(self.index) totals = {} strings = {} for key in self.children: - value, childstr = self.children[key].print(depth+1, total, maxranks) - totals[key] = value - strings[key] = childstr + value, childstr = self.children[key].print(depth+1, total, maxranks) + totals[key] = value + strings[key] = childstr sorted_by_value = dict(sorted(totals.items(), key=lambda x:x[1], reverse=True)) for key in sorted_by_value: tmpstr = tmpstr + strings[key] @@ -183,35 +189,6 @@ def shorten_name(name): short = (tmp[:67] + '...') if len(tmp) > 67 else tmp return short -def get_node_color_visible(v, vmin, vmax): - if v > vmax: - v = vmax - dv = vmax - vmin - if dv == 0: - dv = 1 - frac = 1.0 - ((v-vmin)/dv) - # red is full on - red = 255 - # blue should grow proportionally - blue = int(frac * 255) - green = int(frac * 255) - return red, blue, green - -def get_node_color_visible_one(v, vmin, vmax): - if math.isnan(v) or math.isnan(vmin) or math.isnan(vmax) or vmax == vmin or \ - math.isinf(v) or math.isinf(vmin) or math.isinf(vmax) or vmax == vmin: - return 255 - if v > vmax: - v = vmax - dv = vmax - vmin - if dv <= 0.0: - dv = 1.0 - frac = 1.0 - ((v-vmin)/dv) - if math.isnan(frac): - frac = 1.0 - intensity = int(frac * 255) - return intensity - def drawDOT(df, args, name): # computing new stats if args.verbose: @@ -225,7 +202,7 @@ def drawDOT(df, args, name): df.loc[df['calls'] == 0, 'bytes per call'] = df['total bytes'] metric = 'bytes per call' # Make a new dataframe from rank 0 - filename = name + 'tasktree.dot'; + filename = name.replace(' ', '-') + '-tasktree.dot'; f = open(filename, 'w') f.write('digraph prof {\n') #f.write(' label = "(get this from metadata file output - or, generate it from apex-treesummary.py!)";\n') @@ -245,6 +222,7 @@ def drawDOT(df, args, name): bpc_maximum = df['bytes per call'].max() # get max if args.verbose: print('Building dot file') + graphedIndexes = set() for ind in df.index: name = df['name'][ind] node_index = df['node index'][ind] @@ -254,25 +232,24 @@ def drawDOT(df, args, name): # Remember, the root node is bogus. so skip it. if node_index != parent_index: f.write(' "' + str(parent_index) + '" -> "' + str(node_index) + '";\n') + if node_index in graphedIndexes: + continue f.write(' "' + str(node_index) + '" [shape=box; ') f.write('style=filled; ') acc = df['total time(s)'][ind] bpc = df['bytes per call'][ind] + frac = acc / (acc_maximum - acc_minimum) if "MPI" in name and bpc > 0: - red = int(255) - green = get_node_color_visible_one(bpc, bpc_minimum, bpc_maximum) - blue = get_node_color_visible_one(acc, acc_minimum, acc_maximum) - if blue > red: - red = green - blue = int(255) - else: - blue = green + cmap = mp.colormaps.get_cmap('Reds') else: - red = get_node_color_visible_one(acc, acc_minimum, acc_maximum) - green = red - blue = int(255) + cmap = mp.colormaps.get_cmap('Blues') + rgba = cmap(frac) + rgba = tuple(int((255*x)) for x in rgba[0:3]) + red = rgba[0] + green = rgba[1] + blue = rgba[2] f.write('color=black; ') - if (acc > 0.5 * acc_maximum): + if (frac > 0.5): f.write('fontcolor=white; ') else: f.write('fontcolor=black; ') @@ -296,6 +273,7 @@ def drawDOT(df, args, name): f.write('time: ' + str(acc) + '\\l"; ') f.write('];\n') + graphedIndexes.add(node_index) f.write('}') f.close() if args.dot_show: @@ -306,9 +284,11 @@ def drawDOT(df, args, name): if args.verbose: print('done.') -def graphRank(index, df, parentNode, droplist, args): +def graphRank2(index, df, parentNode, droplist, args): # get the name of this node childDF = df[df['node index'] == index].copy()#.reset_index() + if childDF.shape[0] > 1: + childDF = childDF[childDF['parent index'] == parentNode.index]#.reset_index() name = childDF['name'].iloc[0] # should we skip this subtree? if name in droplist: @@ -325,32 +305,9 @@ def graphRank(index, df, parentNode, droplist, args): #name = df.loc[df['node index'] == index, 'name'].iloc[0] childNode = parentNode.addChild(name, childDF) - # slice out the children from the dataframe - children = df[df['parent index'] == index] - # Iterate over the children indexes and add to our node - for child in children['node index'].unique(): - if child == index: - continue - graphRank(child, df, childNode, droplist, args) - -def graphRank2(index, df, parentNode, droplist, args): - # get the name of this node - childDF = df[df['node index'] == index].copy()#.reset_index() - name = childDF['name'].iloc[0] - # should we skip this subtree? - if name in droplist: - if args.verbose: - print('Dropping: \'', name, '\'', sep='') + # If we have visited this tree before, we are done. + if childDF['visited'].item(): return - for dropped in droplist: - p = re.compile(dropped) - if p.match(name): - if args.verbose: - print('Dropping: \'', name, '\'', sep='') - return - - #name = df.loc[df['node index'] == index, 'name'].iloc[0] - childNode = parentNode.addChild(name, childDF) # slice out the children from the dataframe children = df[df['parent index'] == index] @@ -359,6 +316,8 @@ def graphRank2(index, df, parentNode, droplist, args): if child == index: continue graphRank2(child, df, childNode, droplist, args) + df.loc[df['node index'] == index,'visited'] = True +import ast def main(): args = parseArgs() @@ -370,6 +329,13 @@ def main(): print('Reading tasktree...') df = pd.read_csv(args.filename) #, index_col=[0,1]) df = df.fillna(0) + #print(df) + # Convert the string representation of the list of parents to a list + df.loc[:, "parent index"] = df["parent index"].apply(ast.literal_eval) + df = df.explode('parent index') + df = df.fillna(-1) + #print(df) + if args.verbose: print('Read', len(df.index), 'rows') @@ -421,16 +387,9 @@ def main(): # FIRST, build a master graph with all nodes from all ranks. print('building common tree...') root = TreeNode('apex tree base', pd.DataFrame()) - """ - for x in range(maxrank+1): - print('Rank', x, '...', end=endchar, flush=True) - # slice out this rank's data - rank = df[df['process rank'] == x] - # build a tree of this rank's data - graphRank(0, rank, root, droplist, args) - print() # write a newline - """ + root.index = -1 #unique = df.drop_duplicates(subset=["node index", "parent index", "name"], keep='first') + df['visited'] = False graphRank2(0, df, root, droplist, args) roots = [root] diff --git a/src/scripts/apex_exec b/src/scripts/apex_exec index 2ba201db..7dfe3def 100755 --- a/src/scripts/apex_exec +++ b/src/scripts/apex_exec @@ -69,8 +69,6 @@ where APEX options are zero or more of: --apex:gpu-memory enable GPU memory wrapper support --apex:cpu-memory enable CPU memory wrapper support --apex:delay-memory delay memory wrapper support until explicitly enabled - --apex:untied enable tasks to migrate cores/OS threads - during execution (not compatible with trace output) --apex:cuda enable CUDA/CUPTI measurement (default: off) --apex:cuda-counters enable CUDA/CUPTI counter support (default: off) --apex:cuda-driver enable CUDA driver API callbacks (default: off) @@ -100,6 +98,16 @@ where APEX options are zero or more of: (LD_PRELOAD value is added _after_ APEX libraries) --apex:postprocess run post-process scripts (graphviz, python) on output data after exit " + # Disable the async thread, and run the header program + export APEX_PROC_CPUINFO=0 + export APEX_PROC_LOADAVG=0 + export APEX_PROC_MEMINFO=0 + export APEX_PROC_NET_DEV=0 + export APEX_PROC_SELF_STATUS=0 + export APEX_PROC_SELF_IO=0 + export APEX_PROC_STAT=0 + export APEX_PROC_STAT_DETAILS=0 + ${SCRIPTPATH}/apex_header echo "${message}" exit 1 } @@ -135,7 +143,6 @@ hip_driver=no hip_details=no level0=no monitor_gpu=no -untied=no cpuinfo=no meminfo=no ompt=no @@ -421,11 +428,6 @@ while (( "$#" )); do export APEX_PROC_SELF_IO=1 shift ;; - --apex:untied) - untied=yes - export APEX_UNTIED_TIMERS=1 - shift - ;; --apex:mpi) mpi=yes export APEX_ENABLE_MPI=1 @@ -539,6 +541,24 @@ while (( "$#" )); do usage fi ;; +# begin internal use only + --apex:gprofiler) + export USER_PRELOAD="@GPERFTOOLS_ROOT@/lib/libprofiler.so:" + export CPUPROFILE_PPROF="@GPERFTOOLS_ROOT@/bin/pprof" + export CPUPROFILE=/tmp/prof.out + shift + ;; + --apex:no-async-counters) + export APEX_PROC_CPUINFO=0 + export APEX_PROC_LOADAVG=0 + export APEX_PROC_MEMINFO=0 + export APEX_PROC_NET_DEV=0 + export APEX_PROC_SELF_STATUS=0 + export APEX_PROC_SELF_IO=0 + export APEX_PROC_STAT=0 + export APEX_PROC_STAT_DETAILS=0 + shift + ;; --apex:help|--help|-h) if [ $apex_opts = yes ] ; then usage @@ -670,7 +690,6 @@ fi # remove spaces and double colons APEX_LD_PRELOAD=`echo ${APEX_LD_PRELOAD} | sed -e "s/ /:/g" -e "s/::/:/g" -e "s/:$//"` -export CPUPROFILE=two_vars.prof if [ ${apple} = 1 ]; then APEX_LDD='otool -L' @@ -868,9 +887,9 @@ else retval=$? unset LD_PRELOAD unset DYLD_INSERT_LIBRARIES - if [ "${myrank}" == "0" ] ; then - rm -f ${gdbcmds} - fi + #if [ "${myrank}" == "0" ] ; then + #rm -f ${gdbcmds} + #fi if [ ${retval} != 0 ] ; then echo "Error ${retval}!" count=`ldd ${MPI_LIB2} | grep -c tcmalloc` @@ -878,7 +897,17 @@ else preloadme=`ldd ${MPI_LIB2} | grep tcmalloc | awk '{ print $3 }'` echo "If you got some kind of tcmalloc error, please preload the dependent tcmalloc shared object library with '--apex:preload ${preloadme}'" fi - exit ${retval} + if [ -z ${CPUPROFILE_PPROF} ] ; then + exit ${retval} + fi + fi +fi + +if [ ! -z ${CPUPROFILE_PPROF} ] ; then + if [ -f ${CPUPROFILE} ] ; then + set -x + ${CPUPROFILE_PPROF} --text ${prog} ${CPUPROFILE} + set +x fi fi diff --git a/src/unit_tests/C++/CMakeLists.txt b/src/unit_tests/C++/CMakeLists.txt index 6b9e7e14..80f04fae 100644 --- a/src/unit_tests/C++/CMakeLists.txt +++ b/src/unit_tests/C++/CMakeLists.txt @@ -47,6 +47,8 @@ set(example_programs apex_swap_threads apex_malloc apex_std_thread + apex_multiple_parents + apex_taskstubs ${APEX_OPENMP_TEST} ) #apex_set_thread_cap @@ -82,10 +84,12 @@ foreach(example_program ${example_programs}) set(sources ${example_program}.cpp) source_group("Source Files" FILES ${sources}) add_executable("${example_program}_cpp" ${sources}) - target_link_libraries ("${example_program}_cpp" apex ${LIBS}) + target_link_libraries ("${example_program}_cpp" apex ${LIBS} timer_plugin) if (BUILD_STATIC_EXECUTABLES) set_target_properties("${example_program}_cpp" PROPERTIES LINK_SEARCH_START_STATIC 1 LINK_SEARCH_END_STATIC 1) endif() + # This is needed to make sure local symbols are exported and we can dladdr them + set_property(TARGET "${example_program}_cpp" PROPERTY ENABLE_EXPORTS ON) add_dependencies ("${example_program}_cpp" apex) add_dependencies (tests "${example_program}_cpp") add_test ("test_${example_program}_cpp" "${example_program}_cpp") @@ -130,19 +134,28 @@ include_directories (. ${APEX_SOURCE_DIR}/src/apex ${MPI_CXX_INCLUDE_PATH}) # Make sure the linker can find the Apex library once it is built. link_directories (${APEX_BINARY_DIR}/src/apex) -# Add executable called "apex_hpx_annotated_functions_mpi" that is built from the source file -# "apex_hpx_annotated_functions.cpp". The extensions are automatically found. -add_executable (apex_hpx_annotated_functions_mpi apex_hpx_annotated_functions.cpp) -add_dependencies (apex_hpx_annotated_functions_mpi apex) -add_dependencies (examples apex_hpx_annotated_functions_mpi) - -# Link the executable to the Apex library. -target_link_libraries (apex_hpx_annotated_functions_mpi apex ${MPI_CXX_LINK_FLAGS} ${MPI_CXX_LIBRARIES} ${LIBS} ${APEX_STDCXX_LIB} m) -if (BUILD_STATIC_EXECUTABLES) - set_target_properties(apex_hpx_annotated_functions_mpi PROPERTIES LINK_SEARCH_START_STATIC 1 LINK_SEARCH_END_STATIC 1) -endif() +if(APEX_WITH_MPI) + # Add executable called "apex_hpx_annotated_functions_mpi" that is built from the source file + # "apex_hpx_annotated_functions.cpp". The extensions are automatically found. + add_executable (apex_hpx_annotated_functions_mpi apex_hpx_annotated_functions.cpp) + target_compile_definitions(apex_hpx_annotated_functions_mpi PUBLIC -DAPEX_ENABLE_MPI) + add_dependencies (apex_hpx_annotated_functions_mpi apex) + add_dependencies (examples apex_hpx_annotated_functions_mpi) + add_executable (apex_multiple_parents_mpi apex_multiple_parents.cpp) + target_compile_definitions(apex_multiple_parents_mpi PUBLIC -DAPEX_ENABLE_MPI) + add_dependencies (apex_multiple_parents_mpi apex) + add_dependencies (examples apex_multiple_parents_mpi) + + # Link the executable to the Apex library. + target_link_libraries (apex_hpx_annotated_functions_mpi apex ${MPI_CXX_LINK_FLAGS} ${MPI_CXX_LIBRARIES} ${LIBS} ${APEX_STDCXX_LIB} m) + target_link_libraries (apex_multiple_parents_mpi apex ${MPI_CXX_LINK_FLAGS} ${MPI_CXX_LIBRARIES} ${LIBS} ${APEX_STDCXX_LIB} m) + if (BUILD_STATIC_EXECUTABLES) + set_target_properties(apex_hpx_annotated_functions_mpi PROPERTIES LINK_SEARCH_START_STATIC 1 LINK_SEARCH_END_STATIC 1) + set_target_properties(apex_multiple_parents_mpi PROPERTIES LINK_SEARCH_START_STATIC 1 LINK_SEARCH_END_STATIC 1) + endif() -INSTALL(TARGETS apex_hpx_annotated_functions_mpi - RUNTIME DESTINATION bin OPTIONAL -) + INSTALL(TARGETS apex_hpx_annotated_functions_mpi apex_multiple_parents_mpi + RUNTIME DESTINATION bin OPTIONAL + ) +endif() diff --git a/src/unit_tests/C++/apex_hpx_annotated_functions.cpp b/src/unit_tests/C++/apex_hpx_annotated_functions.cpp index 7c664924..c9dbc338 100644 --- a/src/unit_tests/C++/apex_hpx_annotated_functions.cpp +++ b/src/unit_tests/C++/apex_hpx_annotated_functions.cpp @@ -5,7 +5,7 @@ #include #include #include "apex_api.hpp" -#if defined(APEX_WITH_MPI) +#if defined(APEX_ENABLE_MPI) #include #endif @@ -98,7 +98,7 @@ void* someThread(void* tmp) int main (int argc, char** argv) { /* initialize APEX */ - #if defined(APEX_WITH_MPI) + #if defined(APEX_ENABLE_MPI) MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &comm_rank); MPI_Comm_size(MPI_COMM_WORLD, &comm_size); @@ -153,7 +153,7 @@ int main (int argc, char** argv) { } } apex::cleanup(); - #if defined(APEX_WITH_MPI) + #if defined(APEX_ENABLE_MPI) MPI_Finalize(); #endif return 0; diff --git a/src/unit_tests/C++/apex_hpx_direct_actions.cpp b/src/unit_tests/C++/apex_hpx_direct_actions.cpp index 258a777a..bad39a1c 100644 --- a/src/unit_tests/C++/apex_hpx_direct_actions.cpp +++ b/src/unit_tests/C++/apex_hpx_direct_actions.cpp @@ -19,6 +19,8 @@ const int num_iterations = 10; pthread_barrier_t barrier; #endif +std::atomic guid{32}; + int nsleep(long miliseconds, int tid) { struct timespec req, rem; @@ -47,7 +49,7 @@ int nsleep(long miliseconds, int tid) } void innerLoop(int *tid) { - std::shared_ptr tt_ptr = apex::new_task(__func__); + std::shared_ptr tt_ptr = apex::new_task(__func__, guid++); #ifdef __DEBUG_PRINT__ std::stringstream buf; buf << "APP: " << *tid << ": Starting thread " << tt_ptr->guid << "\n"; std::cout << buf.str(); @@ -65,7 +67,7 @@ void innerLoop(int *tid) { #endif /* Start a timer like an "direct_action" */ - std::shared_ptr af = apex::new_task("direct_action", UINTMAX_MAX, tt_ptr); + std::shared_ptr af = apex::new_task("direct_action", guid++, tt_ptr); #ifdef __DEBUG_PRINT__ buf.str(""); buf.clear(); buf << "APP: " << *tid << ": Starting direct_action " << af->guid << "\n"; std::cout << buf.str(); @@ -126,11 +128,12 @@ void* someThread(void* tmp) #endif apex::register_thread(name); - apex::profiler* p = apex::start(__func__); + auto task = apex::new_task(__func__, guid++); + apex::start(task); for (int i = 0 ; i < num_iterations ; i++) { innerLoop(tid); } - apex::stop(p); + apex::stop(task); /* tell APEX that this thread is exiting */ apex::exit_thread(); @@ -142,8 +145,11 @@ int main (int argc, char** argv) { apex::init("apex::start unit test", 0, 1); /* important, to make sure we get correct profiles at the end */ apex::apex_options::use_screen_output(true); + /* disable untied timers! not yet supported with direct actions. */ + //apex::apex_options::untied_timers(false); /* start a timer */ - apex::profiler* p = apex::start("main"); + auto task = apex::new_task("main", guid); + apex::start(task); /* Spawn X threads */ if (argc > 1) { test_numthreads = strtoul(argv[1],NULL,0); @@ -169,7 +175,7 @@ int main (int argc, char** argv) { free(tids); free(thread); /* stop our main timer */ - apex::stop(p); + apex::stop(task); /* finalize APEX */ apex::finalize(); apex_profile * profile1 = apex::get_profile("direct_action"); diff --git a/src/unit_tests/C++/apex_hpx_task_wrapper_direct_actions.cpp b/src/unit_tests/C++/apex_hpx_task_wrapper_direct_actions.cpp index b5df8404..3217d89d 100644 --- a/src/unit_tests/C++/apex_hpx_task_wrapper_direct_actions.cpp +++ b/src/unit_tests/C++/apex_hpx_task_wrapper_direct_actions.cpp @@ -147,6 +147,8 @@ int main (int argc, char** argv) { apex::init("apex::start unit test", 0, 1); /* important, to make sure we get correct profiles at the end */ apex::apex_options::use_screen_output(true); + /* disable untied timers! not yet supported with direct actions. */ + //apex::apex_options::untied_timers(false); /* start a timer */ apex::profiler* p = apex::start("main"); /* Spawn X threads */ diff --git a/src/unit_tests/C++/apex_multiple_parents.cpp b/src/unit_tests/C++/apex_multiple_parents.cpp new file mode 100644 index 00000000..9d550d56 --- /dev/null +++ b/src/unit_tests/C++/apex_multiple_parents.cpp @@ -0,0 +1,71 @@ +#include +#include +#include +#include +#include "apex_api.hpp" +#include +#ifdef APEX_ENABLE_MPI +#include "mpi.h" +#endif + +int parent (int in, std::shared_ptr< apex::task_wrapper > this_task) { + apex::start(this_task); + usleep(in*100); + apex::stop(this_task); + std::cout << "p1" << std::endl; + return in; +} + +int child (int in, std::shared_ptr< apex::task_wrapper > this_task) { + apex::start(this_task); + usleep(in*100); + std::cout << "c" << std::endl; + apex::stop(this_task); + return in; +} + +int main([[maybe_unused]] int argc, [[maybe_unused]] char *argv[]) { + int comm_rank = 0; + int comm_size = 1; +#ifdef APEX_ENABLE_MPI + MPI_Init(&argc, &argv); + MPI_Comm_rank(MPI_COMM_WORLD, &comm_rank); + MPI_Comm_size(MPI_COMM_WORLD, &comm_size); + std::cout << "APP: rank " << comm_rank << " of " << comm_size << std::endl; +#endif + apex::init("apex_multiple_parents_cpp unit test", comm_rank, comm_size); + apex::scoped_timer foo(__func__); + + int task_id{0}; + // called from the parent, not when the child is spawned! + std::vector> parents; + auto p1 = apex::new_task(std::string("parent1"), ++task_id); + auto f1 = std::async(std::launch::async, parent, task_id, p1); + parents.push_back(p1); + + auto p2 = apex::new_task(std::string("parent2"), ++task_id); + auto f2 = std::async(std::launch::async, parent, task_id, p2); + parents.push_back(p2); + + auto p3 = apex::new_task(std::string("parent3"), ++task_id); + auto f3 = std::async(std::launch::async, parent, task_id, p3); + parents.push_back(p3); + + auto p4 = apex::new_task(std::string("parent4"), ++task_id); + auto f4 = std::async(std::launch::async, parent, task_id, p4); + parents.push_back(p4); + + auto c = apex::new_task(std::string("child"), ++task_id, parents); + auto f5 = std::async(std::launch::async, child, (int)task_id, c); + + int result = f1.get() + f2.get() + f5.get() + f3.get() + f4.get(); + std::cout << "sum is " << result << " (valid value: 6)" << std::endl; + foo.stop(); + //apex::finalize(); +#ifdef APEX_ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + MPI_Finalize(); +#endif + return 0; +} + diff --git a/src/unit_tests/C++/apex_swap_threads.cpp b/src/unit_tests/C++/apex_swap_threads.cpp index 9a90ddcd..532002e9 100644 --- a/src/unit_tests/C++/apex_swap_threads.cpp +++ b/src/unit_tests/C++/apex_swap_threads.cpp @@ -27,7 +27,6 @@ int main (int argc, char** argv) { APEX_UNUSED(argv); init("apex::swap thread unit test", 0, 1); cout << "APEX Version : " << version() << endl; - apex_options::untied_timers(true); apex_options::use_screen_output(true); apex_options::print_options(); pthread_t thread[2]; diff --git a/src/unit_tests/C++/apex_taskstubs.cpp b/src/unit_tests/C++/apex_taskstubs.cpp new file mode 100644 index 00000000..7b1fa669 --- /dev/null +++ b/src/unit_tests/C++/apex_taskstubs.cpp @@ -0,0 +1,212 @@ +/* + * Copyright (c) 2014-2021 Kevin Huck + * Copyright (c) 2014-2021 University of Oregon + * + * Distributed under the BSD 2-Clause Software License. (See accompanying + * file LICENSE) + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include "timer_plugin/tasktimer.h" + +/* ISO C doesn't allow __PRETTY_FUNCTION__, so only do it with C++ */ +#if defined(__GNUC__) && defined(__cplusplus) +#define __APEX__FUNCTION__ __PRETTY_FUNCTION__ +#else +#define __APEX__FUNCTION__ __func__ +#endif + +uint64_t _my_gettid(void) { + pid_t x = syscall(SYS_gettid); + return (uint64_t)(x); +} + +/* This simple example is truly overkill, but it tests all aspects of the API. */ + +std::atomic guid{0}; + +void A(uint64_t); +void B(uint64_t, uint64_t); +void C(uint64_t, uint64_t); +void D(void); +void E(void); +void F(void); +void xfer(uint64_t parent); + +void A(uint64_t parent) { + uint64_t parents[] = {parent}; + uint64_t myguid = guid++; + // both address and name + TASKTIMER_CREATE(&A, __APEX__FUNCTION__, myguid, parents, 1, tt_A); + tasktimer_argument_value_t args[1]; + args[0].type = TASKTIMER_LONG_INTEGER_TYPE; + args[0].l_value = parent; + TASKTIMER_SCHEDULE(tt_A, args, 1); + tasktimer_execution_space_t resource; + resource.type = TASKTIMER_DEVICE_CPU; + resource.device_id = 0; + resource.instance_id = _my_gettid(); + TASKTIMER_START(tt_A, &resource); + B(parent, myguid); + C(parent, myguid); + TASKTIMER_STOP(tt_A); +} + +void B(uint64_t parent1, uint64_t parent2) { + uint64_t parents[] = {parent1, parent2}; + uint64_t myguid = guid++; + // both address and name + TASKTIMER_CREATE(&B, __APEX__FUNCTION__, myguid, parents, 2, tt_B); + tasktimer_argument_value_t args[2]; + args[0].type = TASKTIMER_LONG_INTEGER_TYPE; + args[0].l_value = parent1; + args[1].type = TASKTIMER_LONG_INTEGER_TYPE; + args[1].l_value = parent2; + TASKTIMER_SCHEDULE(tt_B, args, 2); + tasktimer_execution_space_t resource; + resource.type = TASKTIMER_DEVICE_CPU; + resource.device_id = 0; + resource.instance_id = _my_gettid(); + TASKTIMER_START(tt_B, &resource); + TASKTIMER_STOP(tt_B); +} + +void C(uint64_t parent1, uint64_t parent2) { + uint64_t parents[] = {parent1, parent2}; + uint64_t myguid = guid++; + // no name, just address + TASKTIMER_CREATE(&C, nullptr, myguid, parents, 2, tt_C); + tasktimer_argument_value_t args[2]; + args[0].type = TASKTIMER_LONG_INTEGER_TYPE; + args[0].l_value = parent1; + args[1].type = TASKTIMER_LONG_INTEGER_TYPE; + args[1].l_value = parent2; + TASKTIMER_SCHEDULE(tt_C, args, 2); + tasktimer_execution_space_t resource; + resource.type = TASKTIMER_DEVICE_CPU; + resource.device_id = 0; + resource.instance_id = _my_gettid(); + TASKTIMER_START(tt_C, &resource); + D(); + xfer(myguid); + E(); + xfer(myguid); + F(); + TASKTIMER_STOP(tt_C); +} + +void D(void) { + TASKTIMER_COMMAND_START(__APEX__FUNCTION__); + TASKTIMER_COMMAND_STOP(); +} + +void E(void) { + TASKTIMER_COMMAND_START(__APEX__FUNCTION__); + TASKTIMER_COMMAND_STOP(); +} + +void F(void) { + TASKTIMER_COMMAND_START(__APEX__FUNCTION__); + TASKTIMER_COMMAND_STOP(); +} + +void xfer(uint64_t parent) { + constexpr uint64_t maxlen = 1024; + std::array source{1}; + std::array dest{0}; + tasktimer_execution_space_t source_info, dest_info; + tasktimer_execution_space_p sip = &source_info; + tasktimer_execution_space_p dip = &dest_info; + source_info.type = TASKTIMER_DEVICE_CPU; + source_info.device_id = 0; + source_info.instance_id = 0; + dest_info.type = TASKTIMER_DEVICE_CPU; + dest_info.device_id = 0; + dest_info.instance_id = 0; + TASKTIMER_DATA_TRANSFER_START(parent, sip, "source", source.data(), dip, "dest", dest.data()); + std::copy(std::begin(source), std::end(source), std::begin(dest)); + TASKTIMER_DATA_TRANSFER_STOP(parent); +} + +tasktimer_execution_space_t make_resource(void){ + tasktimer_execution_space_t resource; + resource.type = TASKTIMER_DEVICE_CPU; + resource.device_id = 0; + resource.instance_id = _my_gettid(); + return resource; +} + +void add_parent_test(uint64_t parent) { + uint64_t parents[] = {parent}; + uint64_t myguid = guid++; + // both address and name + TASKTIMER_CREATE(nullptr, __APEX__FUNCTION__, myguid, parents, 1, tt_add_parent_test); + TASKTIMER_SCHEDULE(tt_add_parent_test, nullptr, 0); + auto resource = make_resource(); + TASKTIMER_START(tt_add_parent_test, &resource); + // make a new timer with no parent + uint64_t newparent = guid++; + TASKTIMER_CREATE(nullptr, "added_parent", newparent, nullptr, 0, tt_newparent); + TASKTIMER_ADD_PARENTS(tt_newparent, parents, 1); + TASKTIMER_SCHEDULE(tt_newparent, nullptr, 0); + TASKTIMER_START(tt_newparent, &resource); + TASKTIMER_STOP(tt_newparent); + TASKTIMER_STOP(tt_add_parent_test); +} + +void add_child_test(tasktimer_timer_t parent) { + // create without a parent + uint64_t myguid = guid++; + TASKTIMER_CREATE(nullptr, __APEX__FUNCTION__, myguid, nullptr, 0, tt_add_child_test); + TASKTIMER_SCHEDULE(tt_add_child_test, nullptr, 0); + auto resource = make_resource(); + TASKTIMER_START(tt_add_child_test, &resource); + // make another timer with no parent + uint64_t newchild = guid++; + TASKTIMER_CREATE(nullptr, "added_child", newchild, nullptr, 0, tt_newchild); + uint64_t children[] = {myguid,newchild}; + TASKTIMER_ADD_CHILDREN(parent, children, 2); + TASKTIMER_SCHEDULE(tt_newchild, nullptr, 0); + TASKTIMER_START(tt_newchild, &resource); + TASKTIMER_STOP(tt_newchild); + TASKTIMER_STOP(tt_add_child_test); +} + +int main(int argc, char * argv[]) { + // initialize the timer plugin + TASKTIMER_INITIALIZE(); + uint64_t myguid = guid++; + // no address, just name + TASKTIMER_CREATE(nullptr, __APEX__FUNCTION__, myguid, nullptr, 0, tt); + // schedule the task + TASKTIMER_SCHEDULE(tt, nullptr, 0); + // execute the task on CPU 0, thread_id + tasktimer_execution_space_t resource; + resource.type = TASKTIMER_DEVICE_CPU; + resource.device_id = 0; + resource.instance_id = _my_gettid(); + TASKTIMER_START(tt, &resource); + // yield the task + TASKTIMER_YIELD(tt); + // run a "child" task + A(myguid); + // test the "add_parent" feature + add_parent_test(myguid); + // test the "add_child" feature + add_child_test(tt); + // resume the task + TASKTIMER_RESUME(tt, &resource); + // stop the task + TASKTIMER_STOP(tt); + // finalize the timer plugin + TASKTIMER_FINALIZE(); + return 0; +} diff --git a/src/utils/CMakeLists.txt b/src/utils/CMakeLists.txt index dc9576e9..703536a7 100644 --- a/src/utils/CMakeLists.txt +++ b/src/utils/CMakeLists.txt @@ -8,6 +8,7 @@ link_directories (${APEX_BINARY_DIR}/src/apex) set(util_programs apex_make_default_config apex_environment_help + apex_header ) foreach(util_program ${util_programs}) diff --git a/src/utils/apex_header.cpp b/src/utils/apex_header.cpp new file mode 100644 index 00000000..ace1c580 --- /dev/null +++ b/src/utils/apex_header.cpp @@ -0,0 +1,16 @@ +#include "apex_api.hpp" +#include +#include +#include + +using namespace apex; +using namespace std; + +int main (int argc, char** argv) { + APEX_UNUSED(argc); + APEX_UNUSED(argv); + init("apex_header", 0, 1); + finalize(); + return 0; +} +