Skip to content

Commit

Permalink
Lots of debugging for untied tasks and validating task state. Added l…
Browse files Browse the repository at this point in the history
…ots of assertions for debug build checking. Still have some issues with direct actions, but those should be fixed soon, then untied_tasks will be the default.
  • Loading branch information
khuck committed Sep 7, 2024
1 parent ef946a6 commit f39b24b
Show file tree
Hide file tree
Showing 8 changed files with 305 additions and 114 deletions.
266 changes: 163 additions & 103 deletions src/apex/apex.cpp

Large diffs are not rendered by default.

81 changes: 81 additions & 0 deletions src/apex/apex_error_handling.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,84 @@ void apex_test_signal_handler() {
apex_custom_signal_handler(1);
}

std::vector<apex::profiler*>& profilers_to_exit(void) {
static std::vector<apex::profiler*> _thevector;
return _thevector;
}

std::atomic<size_t> threads_to_exit_count{0};

//static void apex_custom_signal_handler_thread_exit([[maybe_unused]] int sig) {
static void apex_custom_signal_handler_thread_exit(
[[maybe_unused]] int sig,
[[maybe_unused]] siginfo_t * info,
[[maybe_unused]] void * context) {
APEX_ASSERT(sig == SIGUSR2);
if (apex::apex_options::untied_timers()) {
auto p = apex::thread_instance::get_current_profiler();
apex::profiler* parent = nullptr;
while(p != nullptr) {
if (p->untied_parent == nullptr || p->untied_parent->state != apex::task_wrapper::RUNNING) {
parent = nullptr;
} else {
parent = p->untied_parent->prof;
}
// only push profilers that were started on THIS thread...
if (p != nullptr && p->thread_id == apex::thread_instance::instance().get_id()) {
profilers_to_exit().push_back(p);
}
p = parent;
}
} else {
// get the timer stack, in reverse order
auto& stack = apex::thread_instance::get_current_profilers();
if (stack.size() > 0) {
for (size_t i = stack.size() ; i > 0 ; i--) {
profilers_to_exit().push_back(stack[i-1]);
}
}
}
threads_to_exit_count--;
return;
}

int apex_register_thread_cleanup(void) {
static bool doOnce{false};
if (doOnce) return 0;
struct sigaction act;
struct sigaction old;
memset(&act, 0, sizeof(act));
memset(&old, 0, sizeof(old));
sigemptyset(&act.sa_mask);
std::array<int,1> mysignals = { SIGUSR2 };
act.sa_flags = SA_RESTART | SA_SIGINFO;
act.sa_sigaction = apex_custom_signal_handler_thread_exit;
for (auto s : mysignals) {
sigaction(s, &act, &old);
#ifdef APEX_BE_GOOD_CITIZEN
other_handlers[s] = old;
#endif
}
doOnce = true;
return 0;
}

void apex_signal_all_threads(void) {
auto tids = apex::thread_instance::gettids();
pthread_t me = pthread_self();
// generous...but not a hard limit. Trying not to allocate memory during the signal handler, above.
profilers_to_exit().reserve(tids.size() * 10);
// don't include myself
threads_to_exit_count = tids.size() - 1;
for (auto t : tids) {
if (t != me) {
pthread_kill(t,SIGUSR2);
}
}
while(threads_to_exit_count > 0) { //wait!
}
for (auto p : profilers_to_exit()) {
apex::stop(p);
}
profilers_to_exit().clear();
}
3 changes: 3 additions & 0 deletions src/apex/apex_error_handling.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,8 @@
void apex_print_backtrace();
void apex_custom_signal_handler(int sig);
int apex_register_signal_handler();
int apex_register_thread_cleanup();
int apex_custom_signal_handler_thread_exit(int sig);
void apex_test_signal_handler();
void apex_signal_all_threads();

10 changes: 9 additions & 1 deletion src/apex/task_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ struct task_wrapper : public hpx::util::external_timer::task_wrapper {
#else
struct task_wrapper {
#endif
typedef enum e_task_state {
CREATED,
RUNNING,
YIELDED,
STOPPED
} task_state_t;
/**
\brief A pointer to the task_identifier for this task_wrapper.
*/
Expand Down Expand Up @@ -82,6 +88,7 @@ struct task_wrapper {
\brief Whether this event requires separate start/end events in gtrace
*/
bool explicit_trace_start;
task_state_t state;
/**
\brief Constructor.
*/
Expand All @@ -94,7 +101,8 @@ struct task_wrapper {
alias(nullptr),
thread_id(0UL),
create_ns(our_clock::now_ns()),
explicit_trace_start(false)
explicit_trace_start(false),
state(CREATED)
{ }
/**
\brief Destructor.
Expand Down
33 changes: 30 additions & 3 deletions src/apex/taskstubs_implementation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,31 @@ maptype& getMyMap(void) {
}

void safePrint(const char * format, tasktimer_guid_t guid) {
std::scoped_lock lock{mtx()};
static std::mutex local_mtx;
std::scoped_lock lock{local_mtx};
printf("%lu %s GUID %lu\n", apex::thread_instance::get_id(), format, guid);
return;
}

void safeInsert(
tasktimer_guid_t guid,
std::shared_ptr<apex::task_wrapper> task) {
#if 0
{
std::scoped_lock lock{mtx()};
getCommonMap()[guid] = task;
}
getMyMap()[guid] = task;
#else
std::scoped_lock lock{mtx()};
getCommonMap()[guid] = task;
#endif
//safePrint("Inserted", guid);
}

std::shared_ptr<apex::task_wrapper> safeLookup(
tasktimer_guid_t guid) {
#if 0
// in the thread local map?
auto task = getMyMap().find(guid);
if (task == getMyMap().end()) {
Expand All @@ -61,10 +68,20 @@ std::shared_ptr<apex::task_wrapper> safeLookup(
}
if (task == getCommonMap().end()) {
safePrint("Not found", guid);
APEX_ASSERT(false);
return nullptr;
}
getMyMap()[guid] = task->second;
}
#else
std::scoped_lock lock{mtx()};
auto task = getCommonMap().find(guid);
if (task == getCommonMap().end()) {
safePrint("Not found", guid);
//APEX_ASSERT(false);
return nullptr;
}
#endif
//safePrint("Found", guid);
return task->second;
}
Expand Down Expand Up @@ -109,7 +126,7 @@ extern "C" {
}
// if no name, use address
if (timer_name == nullptr || strlen(timer_name) == 0) {
printf("Null name for timer: %p\n", function_address);
//printf("Null name for timer: %p\n", function_address);
if (parent_count > 0) {
auto task = apex::new_task(
(apex_function_address)function_address,
Expand Down Expand Up @@ -159,32 +176,42 @@ extern "C" {
tasktimer_execution_space_p) {
// TODO: capture the execution space, somehow...a new task?
MAP_TASK(timer, apex_timer);
//printf("%lu TS: Starting: %lu\n", apex::thread_instance::get_id(), timer);
if (apex_timer != nullptr) {
apex::start(apex_timer);
}
}
void tasktimer_yield_impl(
tasktimer_timer_t timer) {
#if 1
static bool& over = apex::get_program_over();
if (over) return;
MAP_TASK(timer, apex_timer);
//printf("%lu TS: Yielding: %lu\n", apex::thread_instance::get_id(), timer);
if (apex_timer != nullptr) {
apex::yield(apex_timer);
}
#endif
}
void tasktimer_resume_impl(
tasktimer_timer_t timer,
tasktimer_execution_space_p) {
#if 1
// TODO: capture the execution space, somehow...a new task?
MAP_TASK(timer, apex_timer);
//printf("%lu TS: Resuming: %lu\n", apex::thread_instance::get_id(), timer);
// TODO: why no resume function for task_wrapper objects?
if (apex_timer != nullptr) {
apex::start(apex_timer);
apex::resume(apex_timer);
} else {
APEX_ASSERT(false);
}
#endif
}
void tasktimer_stop_impl(
tasktimer_timer_t timer) {
MAP_TASK(timer, apex_timer);
//printf("%lu TS: Stopping: %lu\n", apex::thread_instance::get_id(), timer);
if (apex_timer != nullptr) {
apex::stop(apex_timer);
}
Expand Down
3 changes: 3 additions & 0 deletions src/apex/thread_instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ string thread_instance::map_addr_to_name(apex_function_address function_address)

void thread_instance::set_current_profiler(profiler * the_profiler) {
if (apex_options::untied_timers() == true) {
APEX_ASSERT(the_profiler != nullptr && the_profiler->tt_ptr != nullptr);
// make the previous profiler on the "stack" the parent of this profiler
the_profiler->untied_parent = instance().untied_current_profiler;
// make this profiler the new top of the "stack"
Expand Down Expand Up @@ -377,6 +378,8 @@ void thread_instance::clear_current_profiler(profiler * the_profiler,

profiler * thread_instance::get_current_profiler(void) {
if (apex_options::untied_timers() == true) {
//APEX_ASSERT(instance().untied_current_profiler != nullptr);
//APEX_ASSERT(instance().untied_current_profiler->prof != nullptr);
if (instance().untied_current_profiler == nullptr) {
return nullptr;
}
Expand Down
11 changes: 10 additions & 1 deletion src/apex/thread_instance.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class common_data {
public:
// map from name to thread id - common to all threads
std::map<std::string, int> _name_map;
std::set<pthread_t> _tids;
std::mutex _name_map_mutex;
shared_mutex_type _function_map_mutex;
// map from thread id to is_worker
Expand Down Expand Up @@ -115,6 +116,8 @@ class thread_instance {
}
std::cout << "thread " << _id << " starting... " << __APEX_FUNCTION__ << std::endl;
}
std::lock_guard<std::mutex> l{common()._name_map_mutex};
common()._tids.insert(pthread_self());
};
// private default constructor
thread_instance () = delete;
Expand Down Expand Up @@ -163,9 +166,11 @@ class thread_instance {
static void clear_current_profiler() {
instance().current_profilers.pop_back();
}
static std::vector<profiler*>& get_current_profilers(void) { return instance().current_profilers; }
static void clear_untied_current_profiler() {
auto tmp = instance().untied_current_profiler;
if (tmp == nullptr) return;
//APEX_ASSERT(tmp != nullptr && tmp->prof != nullptr);
if (tmp == nullptr || tmp->prof == nullptr) return;
instance().untied_current_profiler = tmp->prof->untied_parent;
}
static const char * program_path(void);
Expand All @@ -184,6 +189,10 @@ class thread_instance {
instance(false)._exiting = true;
}
void clear_all_profilers(void);
static std::set<pthread_t> gettids(void) {
std::lock_guard<std::mutex> l{common()._name_map_mutex};
return common()._tids;
}

#ifdef APEX_DEBUG
static std::mutex _open_profiler_mutex;
Expand Down
12 changes: 6 additions & 6 deletions src/unit_tests/C++/apex_taskstubs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void C(uint64_t, uint64_t);
void D(void);
void E(void);
void F(void);
void xfer(void);
void xfer(uint64_t parent);

void A(uint64_t parent) {
uint64_t parents[] = {parent};
Expand Down Expand Up @@ -96,9 +96,9 @@ void C(uint64_t parent1, uint64_t parent2) {
resource.instance_id = _my_gettid();
TASKTIMER_START(tt_C, &resource);
D();
xfer();
xfer(myguid);
E();
xfer();
xfer(myguid);
F();
TASKTIMER_STOP(tt_C);
}
Expand All @@ -118,7 +118,7 @@ void F(void) {
TASKTIMER_COMMAND_STOP();
}

void xfer(void) {
void xfer(uint64_t parent) {
constexpr uint64_t maxlen = 1024;
std::array<uint64_t, maxlen> source{1};
std::array<uint64_t, maxlen> dest{0};
Expand All @@ -131,9 +131,9 @@ void xfer(void) {
dest_info.type = TASKTIMER_DEVICE_CPU;
dest_info.device_id = 0;
dest_info.instance_id = 0;
TASKTIMER_DATA_TRANSFER_START(100, sip, "source", source.data(), dip, "dest", dest.data());
TASKTIMER_DATA_TRANSFER_START(parent, sip, "source", source.data(), dip, "dest", dest.data());
std::copy(std::begin(source), std::end(source), std::begin(dest));
TASKTIMER_DATA_TRANSFER_STOP(100);
TASKTIMER_DATA_TRANSFER_STOP(parent);
}

tasktimer_execution_space_t make_resource(void){
Expand Down

0 comments on commit f39b24b

Please sign in to comment.