Skip to content

Commit

Permalink
Cleaning up memory leaks, and converting the dependency tree to share…
Browse files Browse the repository at this point in the history
…d pointers due to multiple parents and multiple parents - cleaning it up becomes a nightmare otherwise. Also adding some utilities to the apex_exec script to help with testing.
  • Loading branch information
khuck committed Sep 4, 2024
1 parent 1f45a22 commit 321ce48
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 57 deletions.
14 changes: 3 additions & 11 deletions src/apex/apex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -534,16 +534,8 @@ uint64_t init(const char * thread_name, uint64_t comm_rank,
* and if so, don't be suspended long enough to enable the main timer. */
bool suspended = apex_options::suspend();
apex_options::suspend(false);
/* For the main thread, we should always start a top level timer.
* The reason is that if the program calls "exit", our atexit() processing
* will stop this timer, effectively stopping all of its children as well,
* so we will get an accurate measurement for abnormal termination. */
auto main = task_wrapper::get_apex_main_wrapper();
// make sure the tracing support puts APEX MAIN on the right thread
// when tracing HPX - the finalization will almost assuredly not
// be stopped on the thread that is calling apex::init. You've been warned.
main->explicit_trace_start = true;
start(main);
/* No need to start a top level timer here, it happens
in the profiler listener. */
if (apex_options::top_level_os_threads()) {
// start top-level timer for main thread, it will get automatically
// stopped when the main wrapper timer is stopped.
Expand All @@ -556,7 +548,7 @@ uint64_t init(const char * thread_name, uint64_t comm_rank,
task_name = "Main Thread";
}
std::shared_ptr<task_wrapper> twp =
new_task(task_name, UINTMAX_MAX, main);
new_task(task_name, UINTMAX_MAX, task_wrapper::get_apex_main_wrapper());
start(twp);
thread_instance::set_top_level_timer(twp);
}
Expand Down
30 changes: 15 additions & 15 deletions src/apex/dependency_tree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ std::mutex Node::treeMutex;
std::atomic<size_t> Node::nodeCount{0};
std::set<std::string> Node::known_metrics;

Node* Node::appendChild(task_identifier* c, Node* existing) {
std::shared_ptr<Node> Node::appendChild(task_identifier* c, std::shared_ptr<Node> existing) {
treeMutex.lock();
auto iter = children.find(*c);
if (iter == children.end()) {
if (existing != nullptr) {
existing->parents.push_back(this);
existing->parents.push_back(shared_from_this());
children.insert(std::make_pair(*c,existing));
treeMutex.unlock();
return existing;
} else {
auto n = new Node(c,this);
auto n = std::make_shared<Node>(c,shared_from_this());
//std::cout << "Inserting " << c->get_name() << std::endl;
children.insert(std::make_pair(*c,n));
treeMutex.unlock();
Expand All @@ -45,12 +45,12 @@ Node* Node::appendChild(task_identifier* c, Node* existing) {
return iter->second;
}

Node* Node::replaceChild(task_identifier* old_child, task_identifier* new_child) {
std::shared_ptr<Node> Node::replaceChild(task_identifier* old_child, task_identifier* new_child) {
treeMutex.lock();
auto olditer = children.find(*old_child);
// not found? shouldn't happen...
if (olditer == children.end()) {
auto n = new Node(new_child,this);
auto n = std::make_shared<Node>(new_child,shared_from_this());
//std::cout << "Inserting " << new_child->get_name() << std::endl;
children.insert(std::make_pair(*new_child,n));
treeMutex.unlock();
Expand All @@ -64,7 +64,7 @@ Node* Node::replaceChild(task_identifier* old_child, task_identifier* new_child)
auto newiter = children.find(*new_child);
// not found? shouldn't happen...
if (newiter == children.end()) {
auto n = new Node(new_child,this);
auto n = std::make_shared<Node>(new_child,shared_from_this());
//std::cout << "Inserting " << new_child->get_name() << std::endl;
children.insert(std::make_pair(*new_child,n));
treeMutex.unlock();
Expand All @@ -75,9 +75,9 @@ Node* Node::replaceChild(task_identifier* old_child, task_identifier* new_child)
}

void Node::writeNode(std::ofstream& outfile, double total) {
static std::set<Node*> processed;
if (processed.count(this)) return;
processed.insert(this);
static std::set<std::shared_ptr<Node>> processed;
if (processed.count(shared_from_this())) return;
processed.insert(shared_from_this());
static size_t depth = 0;
// Write out the relationships
for(auto& parent : parents) {
Expand Down Expand Up @@ -127,7 +127,7 @@ void Node::writeNode(std::ofstream& outfile, double total) {
depth--;
}

bool cmp(std::pair<task_identifier, Node*>& a, std::pair<task_identifier, Node*>& b) {
bool cmp(std::pair<task_identifier, std::shared_ptr<Node>>& a, std::pair<task_identifier, std::shared_ptr<Node>>& b) {
return a.second->getAccumulated() > b.second->getAccumulated();
}

Expand Down Expand Up @@ -164,7 +164,7 @@ double Node::writeNodeASCII(std::ofstream& outfile, double total, size_t indent)
outfile << std::endl;

// sort the children by accumulated time
std::vector<std::pair<task_identifier, Node*> > sorted;
std::vector<std::pair<task_identifier, std::shared_ptr<Node>> > sorted;
for (auto& it : children) {
sorted.push_back(it);
}
Expand Down Expand Up @@ -351,9 +351,9 @@ void Node::addAccumulated(double value, double incl, bool is_resume, uint64_t th

double Node::writeNodeCSV(std::stringstream& outfile, double total, int node_id, int num_papi_counters) {
static size_t depth = 0;
static std::set<Node*> processed;
if (processed.count(this)) return getAccumulated();
processed.insert(this);
static std::set<std::shared_ptr<Node>> processed;
if (processed.count(shared_from_this())) return getAccumulated();
processed.insert(shared_from_this());
APEX_ASSERT(total > 0.0);
// write out the node id and graph node index and the name
outfile << node_id << "," << index << ",\"[";
Expand Down Expand Up @@ -432,7 +432,7 @@ double Node::writeNodeCSV(std::stringstream& outfile, double total, int node_id,
outfile << std::endl;

// sort the children by name to make tree merging easier (I hope)
std::vector<Node*> sorted;
std::vector<std::shared_ptr<Node>> sorted;
for (auto& it : children) {
sorted.push_back(it.second);
}
Expand Down
23 changes: 9 additions & 14 deletions src/apex/dependency_tree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ class metricStorage {
}
};

class Node {
class Node : public std::enable_shared_from_this<Node> {
private:
task_identifier* data;
std::vector<Node*> parents;
std::vector<std::shared_ptr<Node>> parents;
size_t count;
apex_profile prof;
//double calls;
Expand All @@ -59,14 +59,14 @@ class Node {
double inclusive;
size_t index;
std::set<uint64_t> thread_ids;
std::unordered_map<task_identifier, Node*> children;
std::unordered_map<task_identifier, std::shared_ptr<Node>> children;
// map for arbitrary metrics
std::map<std::string, metricStorage> metric_map;
static std::mutex treeMutex;
static std::atomic<size_t> nodeCount;
static std::set<std::string> known_metrics;
public:
Node(task_identifier* id, Node* p) :
Node(task_identifier* id, std::shared_ptr<Node> p) :
data(id), count(1), inclusive(0),
index(nodeCount.fetch_add(1, std::memory_order_relaxed)) {
parents.push_back(p);
Expand All @@ -77,15 +77,10 @@ class Node {
prof.sum_squares = 0.0;
memset(prof.papi_metrics, 0, sizeof(double)*8);
}
~Node() {
treeMutex.lock();
for (auto c : children) {
delete c.second;
}
treeMutex.unlock();
}
Node* appendChild(task_identifier* c, Node* existing);
Node* replaceChild(task_identifier* old_child, task_identifier* new_child);
// nothing to destruct? because we used smart pointers for parents/children...
~Node() { }
std::shared_ptr<Node> appendChild(task_identifier* c, std::shared_ptr<Node> existing);
std::shared_ptr<Node> replaceChild(task_identifier* old_child, task_identifier* new_child);
task_identifier* getData() { return data; }
size_t getCount() { return count; }
inline double& getCalls() { return prof.calls; }
Expand All @@ -111,7 +106,7 @@ class Node {
return known_metrics;
}
// required for using this class as a key in a map, vector, etc.
static bool compareNodeByParentName (const Node* lhs, const Node* rhs) {
static bool compareNodeByParentName (const std::shared_ptr<Node> lhs, const std::shared_ptr<Node> rhs) {
if (lhs->parents[0]->index < rhs->parents[0]->index) {
return true;
}
Expand Down
9 changes: 9 additions & 0 deletions src/apex/profile_reducer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,12 @@ std::map<std::string, apex_profile*> reduce_profiles_for_screen() {
if (mpi_initialized && commsize > 1) {
MPI_CALL(PMPI_Allgather(sbuf, sbuf_length, MPI_CHAR,
rbuf, sbuf_length, MPI_CHAR, MPI_COMM_WORLD));
free(sbuf);
} else {
#else
if (true) {
#endif
// free the unused receive buffer
free(rbuf);
rbuf = sbuf;
}
Expand All @@ -149,6 +151,7 @@ std::map<std::string, apex_profile*> reduce_profiles_for_screen() {
//DEBUG_PRINT("%d Inserting: %s\n", commrank, tmp.c_str());
}
}
free(rbuf);
// the set is already sorted?...
//sort(all_names.begin(), all_names.end());

Expand Down Expand Up @@ -207,11 +210,15 @@ std::map<std::string, apex_profile*> reduce_profiles_for_screen() {
if (mpi_initialized && commsize > 1) {
MPI_CALL(PMPI_Gather(s_pdata, sbuf_length, MPI_DOUBLE,
r_pdata, sbuf_length, MPI_DOUBLE, 0, MPI_COMM_WORLD));
// free the send buffer
free(s_pdata);
} else {
#else
if (true) {
#endif
// free the unused "receive" buffer
free(r_pdata);
// point to the send buffer
r_pdata = s_pdata;
}

Expand Down Expand Up @@ -265,6 +272,8 @@ std::map<std::string, apex_profile*> reduce_profiles_for_screen() {
dptr = &(dptr[num_fields]);
}
}
// free the buffer, receive buffer if MPI, send if not
free (r_pdata);

}
#if defined(APEX_WITH_MPI) || \
Expand Down
8 changes: 6 additions & 2 deletions src/apex/profiler_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1786,6 +1786,10 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl
if (apex_options::process_async_state()) {
finalize_profiles(data, reduced);
}
for (auto itr : reduced) {
free(itr.second);
}
reduced.clear();
}
if (apex_options::use_taskgraph_output())
{
Expand Down Expand Up @@ -2167,18 +2171,18 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl
}

void profiler_listener::stop_main_timer(void) {
if (!_main_timer_stopped) {
if (!_main_timer_stopped && !main_timer->stopped) {
APEX_ASSERT(main_timer != nullptr);
main_timer->stop(true);
//_common_stop(main_timer, false);
//on_task_complete(main_timer->tt_ptr);
_main_timer_stopped = true;
push_profiler((unsigned int)thread_instance::get_id(), *main_timer);
}
}

void profiler_listener::on_pre_shutdown(void) {
stop_main_timer();
push_profiler((unsigned int)thread_instance::get_id(), *main_timer);
}

void profiler_listener::push_profiler_public(std::shared_ptr<profiler> &p) {
Expand Down
13 changes: 11 additions & 2 deletions src/apex/task_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ struct task_wrapper {
/**
\brief A node in the task tree representing this task type
*/
dependency::Node* tree_node;
std::shared_ptr<dependency::Node> tree_node;
/**
\brief Internal usage, used to manage HPX direct actions when their
parent task is yielded by the runtime.
Expand Down Expand Up @@ -96,6 +96,14 @@ struct task_wrapper {
create_ns(our_clock::now_ns()),
explicit_trace_start(false)
{ }
/**
\brief Destructor.
*/
~task_wrapper(void) {
//if (tree_node != nullptr) { delete tree_node; }
if (alias != nullptr) { delete alias; }
}

/**
\brief Get the task_identifier for this task_wrapper.
\returns A pointer to the task_identifier
Expand All @@ -119,7 +127,8 @@ struct task_wrapper {
const std::string apex_main_str(APEX_MAIN_STR);
tt_ptr = std::make_shared<task_wrapper>();
tt_ptr->task_id = task_identifier::get_task_id(apex_main_str);
tt_ptr->tree_node = new dependency::Node(tt_ptr->task_id, nullptr);
std::shared_ptr<dependency::Node> tmp(new dependency::Node(tt_ptr->task_id, nullptr));
tt_ptr->tree_node = tmp;
}
mtx.unlock();
}
Expand Down
29 changes: 18 additions & 11 deletions src/apex/taskstubs_implementation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@

using maptype = std::unordered_map<tasktimer_guid_t,
std::shared_ptr<apex::task_wrapper>>;
std::mutex mtx;

std::mutex& mtx(void) {
static std::mutex mtx;
return mtx;
}

maptype& getCommonMap(void) {
static maptype theMap;
Expand All @@ -29,17 +33,18 @@ maptype& getMyMap(void) {
}

void safePrint(const char * format, tasktimer_guid_t guid) {
std::scoped_lock lock{mtx};
std::scoped_lock lock{mtx()};
printf("%lu %s GUID %lu\n", apex::thread_instance::get_id(), format, guid);
return;
}

void safeInsert(
tasktimer_guid_t guid,
std::shared_ptr<apex::task_wrapper> task) {
mtx.lock();
getCommonMap()[guid] = task;
mtx.unlock();
{
std::scoped_lock lock{mtx()};
getCommonMap()[guid] = task;
}
getMyMap()[guid] = task;
//safePrint("Inserted", guid);
}
Expand All @@ -50,9 +55,10 @@ std::shared_ptr<apex::task_wrapper> safeLookup(
auto task = getMyMap().find(guid);
if (task == getMyMap().end()) {
// in the common map?
std::scoped_lock lock{mtx};
task = getCommonMap().find(guid);
mtx.unlock();
{
std::scoped_lock lock{mtx()};
task = getCommonMap().find(guid);
}
if (task == getCommonMap().end()) {
safePrint("Not found", guid);
return nullptr;
Expand All @@ -68,9 +74,10 @@ void safeErase(
return;
/*
getMyMap().erase(guid);
mtx.lock();
getCommonMap().erase(guid);
mtx.unlock();
{
std::scoped_lock lock{mtx()};
getCommonMap().erase(guid);
}
//safePrint("Destroyed", guid);
*/
}
Expand Down
Loading

0 comments on commit 321ce48

Please sign in to comment.