Skip to content

Commit

Permalink
Adding initial support for multiple parents for Iris, PARSEc
Browse files Browse the repository at this point in the history
  • Loading branch information
khuck committed Aug 21, 2024
1 parent 2028aa7 commit 2d00c87
Show file tree
Hide file tree
Showing 13 changed files with 227 additions and 154 deletions.
138 changes: 89 additions & 49 deletions src/apex/apex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,8 @@ string& version() {
inline std::shared_ptr<task_wrapper> _new_task(
task_identifier * id,
const uint64_t task_id,
const std::shared_ptr<task_wrapper> parent_task, apex* instance) {
const std::vector<std::shared_ptr<task_wrapper>> parent_tasks,
apex* instance) {
in_apex prevent_deadlocks;
APEX_UNUSED(instance);
std::shared_ptr<task_wrapper> tt_ptr = make_shared<task_wrapper>();
Expand All @@ -643,17 +644,15 @@ inline std::shared_ptr<task_wrapper> _new_task(
!apex_options::use_otf2()) {
tt_ptr->parent = task_wrapper::get_apex_main_wrapper();
// was a parent passed in?
} else */ if (parent_task != nullptr) {
tt_ptr->parent_guid = parent_task->guid;
tt_ptr->parent = parent_task;
} else */ if (parent_tasks.size() > 0) {
tt_ptr->parents = parent_tasks;
// if not, is there a current timer?
} else {
profiler * p = thread_instance::instance().get_current_profiler();
if (p != nullptr) {
tt_ptr->parent_guid = p->guid;
tt_ptr->parent = p->tt_ptr;
tt_ptr->parents.push_back(p->tt_ptr);
} else {
tt_ptr->parent = task_wrapper::get_apex_main_wrapper();
tt_ptr->parents.push_back(task_wrapper::get_apex_main_wrapper());
}
}
if (apex_options::use_tasktree_output() || apex_options::use_hatchet_output()) {
Expand Down Expand Up @@ -722,7 +721,7 @@ profiler* start(const std::string &timer_name)
if (_notify_listeners) {
bool success = true;
task_identifier * id = task_identifier::get_task_id(timer_name);
tt_ptr = _new_task(id, UINTMAX_MAX, null_task_wrapper, instance);
tt_ptr = _new_task(id, UINTMAX_MAX, {}, instance);
#if defined(APEX_DEBUG)//_disabled)
if (apex_options::use_verbose()) { debug_print("Start", tt_ptr); }
#endif
Expand Down Expand Up @@ -787,7 +786,7 @@ profiler* start(const apex_function_address function_address) {
if (_notify_listeners) {
bool success = true;
task_identifier * id = task_identifier::get_task_id(function_address);
tt_ptr = _new_task(id, UINTMAX_MAX, null_task_wrapper, instance);
tt_ptr = _new_task(id, UINTMAX_MAX, {}, instance);
#if defined(APEX_DEBUG)//_disabled)
if (apex_options::use_verbose()) { debug_print("Start", tt_ptr); }
#endif
Expand Down Expand Up @@ -918,7 +917,7 @@ profiler* resume(const std::string &timer_name) {
std::shared_ptr<task_wrapper> tt_ptr(nullptr);
if (_notify_listeners) {
task_identifier * id = task_identifier::get_task_id(timer_name);
tt_ptr = _new_task(id, UINTMAX_MAX, null_task_wrapper, instance);
tt_ptr = _new_task(id, UINTMAX_MAX, {}, instance);
APEX_UTIL_REF_COUNT_TASK_WRAPPER
try {
//read_lock_type l(instance->listener_mutex);
Expand Down Expand Up @@ -962,7 +961,7 @@ profiler* resume(const apex_function_address function_address) {
std::shared_ptr<task_wrapper> tt_ptr(nullptr);
if (_notify_listeners) {
task_identifier * id = task_identifier::get_task_id(function_address);
tt_ptr = _new_task(id, UINTMAX_MAX, null_task_wrapper, instance);
tt_ptr = _new_task(id, UINTMAX_MAX, {}, instance);
APEX_UTIL_REF_COUNT_TASK_WRAPPER
try {
//read_lock_type l(instance->listener_mutex);
Expand Down Expand Up @@ -1397,36 +1396,68 @@ void sample_value(const std::string &name, double value, bool threaded)
}
}

#define APEX_CHECK_DISABLE \
/* if APEX is disabled, do nothing. */ \
if (apex_options::disable() == true) { \
APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER \
return nullptr; \
}
#define APEX_CHECK_SUSPEND \
/* if APEX is suspended, do nothing. */ \
if (apex_options::suspend() == true) { \
APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER \
return nullptr; \
}
#define APEX_CHECK_INTERNAL \
const std::string apex_internal("apex_internal"); \
if (starts_with(name, apex_internal)) { \
APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER \
/* don't process our own events - queue scrubbing tasks. */ \
return nullptr; \
}
#define APEX_CHECK_INSTANCE \
/* get the Apex static instance */ \
apex* instance = apex::instance(); \
/* protect against calls after finalization */ \
if (!instance || _exited) { \
APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER \
return nullptr; \
}

std::shared_ptr<task_wrapper> new_task(
const std::string &name,
const uint64_t task_id,
const std::shared_ptr<task_wrapper> parent_task)
{
in_apex prevent_deadlocks;
// if APEX is disabled, do nothing.
if (apex_options::disable() == true) {
APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER
return nullptr;
}
// if APEX is suspended, do nothing.
if (apex_options::suspend() == true) {
APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER
return nullptr;
}
const std::string apex_internal("apex_internal");
if (starts_with(name, apex_internal)) {
APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER
// don't process our own events - queue scrubbing tasks.
return nullptr;
APEX_CHECK_DISABLE
APEX_CHECK_SUSPEND
APEX_CHECK_INTERNAL
APEX_CHECK_INSTANCE
task_identifier * id = task_identifier::get_task_id(name);
std::vector<std::shared_ptr<task_wrapper>> parents{};
if (parent_task != null_task_wrapper) {
parents.push_back(parent_task);
}
apex* instance = apex::instance(); // get the Apex static instance
if (!instance || _exited) {
APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER
return nullptr;
} // protect against calls after finalization
std::shared_ptr<task_wrapper>
tt_ptr(_new_task(id, task_id, parents, instance));
APEX_UTIL_REF_COUNT_TASK_WRAPPER
return tt_ptr;
}

std::shared_ptr<task_wrapper> new_task(
const std::string &name,
const uint64_t task_id,
const std::vector<std::shared_ptr<task_wrapper>> parent_tasks)
{
in_apex prevent_deadlocks;
APEX_CHECK_DISABLE
APEX_CHECK_SUSPEND
APEX_CHECK_INTERNAL
APEX_CHECK_INSTANCE
task_identifier * id = task_identifier::get_task_id(name);
std::shared_ptr<task_wrapper>
tt_ptr(_new_task(id, task_id, parent_task, instance));
tt_ptr(_new_task(id, task_id, parent_tasks, instance));
APEX_UTIL_REF_COUNT_TASK_WRAPPER
return tt_ptr;
}
Expand All @@ -1436,23 +1467,32 @@ std::shared_ptr<task_wrapper> new_task(
const uint64_t task_id,
const std::shared_ptr<task_wrapper> parent_task) {
in_apex prevent_deadlocks;
// if APEX is disabled, do nothing.
if (apex_options::disable() == true) {
APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER
return nullptr; }
// if APEX is suspended, do nothing.
if (apex_options::suspend() == true) {
APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER
return nullptr; }
// get the Apex static instance
apex* instance = apex::instance();
// protect against calls after finalization
if (!instance || _exited) {
APEX_UTIL_REF_COUNT_NULL_TASK_WRAPPER
return nullptr; }
APEX_CHECK_DISABLE
APEX_CHECK_SUSPEND
APEX_CHECK_INSTANCE
task_identifier * id = task_identifier::get_task_id(function_address);
std::vector<std::shared_ptr<task_wrapper>> parents{};
if (parent_task != null_task_wrapper) {
parents.push_back(parent_task);
}
std::shared_ptr<task_wrapper>
tt_ptr(_new_task(id, task_id, parent_task, instance));
tt_ptr(_new_task(id, task_id, parents, instance));
APEX_UTIL_REF_COUNT_TASK_WRAPPER
return tt_ptr;
}

std::shared_ptr<task_wrapper> new_task(
const apex_function_address function_address,
const uint64_t task_id,
const std::vector<std::shared_ptr<task_wrapper>> parent_tasks) {
in_apex prevent_deadlocks;
APEX_CHECK_DISABLE
APEX_CHECK_SUSPEND
APEX_CHECK_INSTANCE
task_identifier * id = task_identifier::get_task_id(function_address);
std::shared_ptr<task_wrapper>
tt_ptr(_new_task(id, task_id, parent_tasks, instance));
APEX_UTIL_REF_COUNT_TASK_WRAPPER
return tt_ptr;
}

Expand All @@ -1474,7 +1514,7 @@ std::shared_ptr<task_wrapper> update_task(
// protect against calls after finalization
if (!instance || _exited) { return nullptr; }
task_identifier * id = task_identifier::get_task_id(timer_name);
wrapper = _new_task(id, UINTMAX_MAX, null_task_wrapper, instance);
wrapper = _new_task(id, UINTMAX_MAX, {}, instance);
} else {
task_identifier * id = task_identifier::get_task_id(timer_name);
// only have to do something if the ID has changed
Expand Down Expand Up @@ -1518,7 +1558,7 @@ std::shared_ptr<task_wrapper> update_task(
// protect against calls after finalization
if (!instance || _exited) { return nullptr; }
task_identifier * id = task_identifier::get_task_id(function_address);
wrapper = _new_task(id, UINTMAX_MAX, null_task_wrapper, instance);
wrapper = _new_task(id, UINTMAX_MAX, {}, instance);
} else {
task_identifier * id = task_identifier::get_task_id(function_address);
// only have to do something if the ID has changed
Expand Down
7 changes: 6 additions & 1 deletion src/apex/apex_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ APEX_EXPORT std::shared_ptr<task_wrapper> new_task(

APEX_EXPORT std::shared_ptr<task_wrapper> new_task(
const std::string &name,
const uint64_t task_id = UINTMAX_MAX,
const uint64_t task_id,
const std::vector<std::shared_ptr<apex::task_wrapper>> parent_tasks);

/**
Expand All @@ -380,6 +380,11 @@ APEX_EXPORT std::shared_ptr<task_wrapper> new_task(
const uint64_t task_id = UINTMAX_MAX,
const std::shared_ptr<apex::task_wrapper> parent_task = null_task_wrapper);

APEX_EXPORT std::shared_ptr<task_wrapper> new_task(
const apex_function_address function_address,
const uint64_t task_id,
const std::vector<std::shared_ptr<apex::task_wrapper>> parent_tasks);

/**
\brief Update a task (dependency).
Expand Down
45 changes: 33 additions & 12 deletions src/apex/dependency_tree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,22 @@ std::mutex Node::treeMutex;
std::atomic<size_t> Node::nodeCount{0};
std::set<std::string> Node::known_metrics;

Node* Node::appendChild(task_identifier* c) {
Node* Node::appendChild(task_identifier* c, Node* existing) {
treeMutex.lock();
auto iter = children.find(*c);
if (iter == children.end()) {
auto n = new Node(c,this);
//std::cout << "Inserting " << c->get_name() << std::endl;
children.insert(std::make_pair(*c,n));
treeMutex.unlock();
return n;
if (existing != nullptr) {
existing->parents.push_back(this);
children.insert(std::make_pair(*c,existing));
treeMutex.unlock();
return existing;
} else {
auto n = new Node(c,this);
//std::cout << "Inserting " << c->get_name() << std::endl;
children.insert(std::make_pair(*c,n));
treeMutex.unlock();
return n;
}
}
iter->second->count++;
treeMutex.unlock();
Expand Down Expand Up @@ -68,11 +75,16 @@ Node* Node::replaceChild(task_identifier* old_child, task_identifier* new_child)
}

void Node::writeNode(std::ofstream& outfile, double total) {
static std::set<Node*> processed;
if (processed.count(this)) return;
processed.insert(this);
static size_t depth = 0;
// Write out the relationships
if (parent != nullptr) {
outfile << " \"" << parent->getIndex() << "\" -> \"" << getIndex() << "\";";
outfile << std::endl;
for(auto& parent : parents) {
if (parent != nullptr) {
outfile << " \"" << parent->getIndex() << "\" -> \"" << getIndex() << "\";";
outfile << std::endl;
}
}

double acc = (data == task_identifier::get_main_task_id() || getAccumulated() == 0.0) ?
Expand Down Expand Up @@ -339,11 +351,20 @@ void Node::addAccumulated(double value, double incl, bool is_resume, uint64_t th

double Node::writeNodeCSV(std::stringstream& outfile, double total, int node_id, int num_papi_counters) {
static size_t depth = 0;
static std::set<Node*> processed;
if (processed.count(this)) return getAccumulated();
processed.insert(this);
APEX_ASSERT(total > 0.0);
// write out the node id and graph node index and the name
outfile << node_id << "," << index << ",";
outfile << ((parent == nullptr) ? 0 : parent->index) << ",";
outfile << depth << ",\"";
outfile << node_id << "," << index << ",\"[";
std::string delim("");
for (auto& parent : parents) {
if (parent != nullptr) {
outfile << delim << parent->index;
delim = ",";
}
}
outfile << "]\"," << depth << ",\"";
outfile << data->get_tree_name() << "\",";
// write out the accumulated
double acc = (data == task_identifier::get_main_task_id() || getAccumulated() == 0.0) ?
Expand Down
10 changes: 5 additions & 5 deletions src/apex/dependency_tree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class metricStorage {
class Node {
private:
task_identifier* data;
Node* parent;
std::vector<Node*> parents;
size_t count;
apex_profile prof;
//double calls;
Expand All @@ -67,8 +67,9 @@ class Node {
static std::set<std::string> known_metrics;
public:
Node(task_identifier* id, Node* p) :
data(id), parent(p), count(1), inclusive(0),
data(id), count(1), inclusive(0),
index(nodeCount.fetch_add(1, std::memory_order_relaxed)) {
parents.push_back(p);
prof.calls = 0.0;
prof.accumulated = 0.0;
prof.minimum = 0.0;
Expand All @@ -83,10 +84,9 @@ class Node {
}
treeMutex.unlock();
}
Node* appendChild(task_identifier* c);
Node* appendChild(task_identifier* c, Node* existing);
Node* replaceChild(task_identifier* old_child, task_identifier* new_child);
task_identifier* getData() { return data; }
Node* getParent() { return parent; }
size_t getCount() { return count; }
inline double& getCalls() { return prof.calls; }
inline double& getAccumulated() { return prof.accumulated; }
Expand All @@ -112,7 +112,7 @@ class Node {
}
// required for using this class as a key in a map, vector, etc.
static bool compareNodeByParentName (const Node* lhs, const Node* rhs) {
if (lhs->parent->index < rhs->parent->index) {
if (lhs->parents[0]->index < rhs->parents[0]->index) {
return true;
}
if (lhs->getName().compare(rhs->getName()) < 0) {
Expand Down
18 changes: 15 additions & 3 deletions src/apex/otf2_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1228,7 +1228,11 @@ namespace apex {
OTF2_AttributeList * al = OTF2_AttributeList_New();
// create an attribute
OTF2_AttributeList_AddUint64( al, 0, tt_ptr->guid );
OTF2_AttributeList_AddUint64( al, 1, tt_ptr->parent_guid );
std::vector<uint64_t> pguids = {};
for (auto& parent : tt_ptr->parents) {
pguids.push_back(parent->guid);
}
OTF2_AttributeList_AddUint64( al, pguids.size(), pguids.data() );
uint64_t idx = get_region_index(id);
uint64_t stamp = 0L;
if (thread_instance::get_id() == 0) {
Expand Down Expand Up @@ -1283,7 +1287,11 @@ namespace apex {
OTF2_AttributeList * al = OTF2_AttributeList_New();
// create an attribute
OTF2_AttributeList_AddUint64( al, 0, p->tt_ptr->guid );
OTF2_AttributeList_AddUint64( al, 1, p->tt_ptr->parent_guid );
std::vector<uint64_t> pguids = {};
for (auto& parent : tt_ptr->parents) {
pguids.push_back(parent->guid);
}
OTF2_AttributeList_AddUint64( al, pguids.size(), pguids.data() );
// unfortunately, we can't use the timestamp from the
// profiler object. bummer. it has to be taken after
// the lock is acquired, so that events happen on
Expand Down Expand Up @@ -2744,7 +2752,11 @@ namespace apex {
OTF2_AttributeList * al = OTF2_AttributeList_New();
// create an attribute
OTF2_AttributeList_AddUint64( al, 0, p->tt_ptr->guid );
OTF2_AttributeList_AddUint64( al, 1, p->tt_ptr->parent_guid );
std::vector<uint64_t> pguids = {};
for (auto& parent : tt_ptr->parents) {
pguids.push_back(parent->guid);
}
OTF2_AttributeList_AddUint64( al, pguids.size(), pguids.data() );
OTF2_EC(OTF2_EvtWriter_Enter( local_evt_writer, al,
stamp, idx /* region */ ));
stamp = p->get_stop_ns() - globalOffset;
Expand Down
Loading

0 comments on commit 2d00c87

Please sign in to comment.