Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions echion/coremodule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ static void do_where(std::ostream& stream)
auto interleave_success = interleave_stacks();
if (!interleave_success)
{
std::cerr << "could not interleave stacks" << std::endl;
// std::cerr <<"could not interleave stacks" << std::endl;
return;
}

Expand Down Expand Up @@ -131,7 +131,7 @@ static inline void _start()
do_where(pipe);

else
std::cerr << "Failed to open pipe " << pipe_name << std::endl;
// std::cerr <<"Failed to open pipe " << pipe_name << std::endl;

running = 0;

Expand Down Expand Up @@ -178,6 +178,7 @@ static inline void _stop()
const std::lock_guard<std::mutex> guard(thread_info_map_lock);

thread_info_map.clear();
string_table.dump(std::cerr);
string_table.clear();
}

Expand Down
4 changes: 2 additions & 2 deletions echion/danger.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ struct ThreadAltStack {

void* mem = mmap(nullptr, kAltStackSize, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0);
if (mem == MAP_FAILED) {
std::cerr << "Failed to allocate alt stack. Memory copying may not work." << " Error: " << strerror(errno) << std::endl;
// std::cerr <<"Failed to allocate alt stack. Memory copying may not work." << " Error: " << strerror(errno) << std::endl;
return -1;
}

Expand All @@ -73,7 +73,7 @@ struct ThreadAltStack {
ss.ss_size = kAltStackSize;
ss.ss_flags = 0;
if (sigaltstack(&ss, nullptr) != 0) {
std::cerr << "Failed to set alt stack. Memory copying may not work." << " Error: " << strerror(errno) << std::endl;
// std::cerr <<"Failed to set alt stack. Memory copying may not work." << " Error: " << strerror(errno) << std::endl;
return -1;
}

Expand Down
4 changes: 2 additions & 2 deletions echion/render.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ void WhereRenderer::render_frame(Frame& frame)
auto maybe_name_str = string_table.lookup(frame.name);
if (!maybe_name_str)
{
std::cerr << "could not get name for render_frame" << std::endl;
// std::cerr <<"could not get name for render_frame" << std::endl;
return;
}
const auto& name_str = maybe_name_str->get();
Expand All @@ -16,7 +16,7 @@ void WhereRenderer::render_frame(Frame& frame)
auto maybe_filename_str = string_table.lookup(frame.filename);
if (!maybe_filename_str)
{
std::cerr << "could not get filename for render_frame" << std::endl;
// std::cerr <<"could not get filename for render_frame" << std::endl;
return;
}
const auto& filename_str = maybe_filename_str->get();
Expand Down
2 changes: 1 addition & 1 deletion echion/render.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class MojoRenderer : public RendererInterface
output.open(std::getenv("ECHION_OUTPUT"));
if (!output.is_open())
{
std::cerr << "Failed to open output file " << std::getenv("ECHION_OUTPUT") << std::endl;
// std::cerr <<"Failed to open output file " << std::getenv("ECHION_OUTPUT") << std::endl;
return ErrorKind::RendererError;
}

Expand Down
4 changes: 2 additions & 2 deletions echion/stacks.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ static Result<void> interleave_stacks(FrameStack& python_stack)
{
// We expected a Python frame but we found none, so we report
// the native frame instead.
std::cerr << "Expected Python frame(s), found none!" << std::endl;
// std::cerr <<"Expected Python frame(s), found none!" << std::endl;
interleaved_stack.push_front(native_frame);
}
else
Expand Down Expand Up @@ -311,7 +311,7 @@ static Result<void> interleave_stacks(FrameStack& python_stack)

if (p != python_stack.rend())
{
std::cerr << "Python stack not empty after interleaving!" << std::endl;
// std::cerr <<"Python stack not empty after interleaving!" << std::endl;
while (p != python_stack.rend())
interleaved_stack.push_front(*p++);
}
Expand Down
9 changes: 9 additions & 0 deletions echion/strings.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,15 @@ class StringTable : public std::unordered_map<uintptr_t, std::string>
this->emplace(UNKNOWN, "<unknown>");
};

void dump(std::ostream& os) const
{
os << "String table:" << std::endl;
for (const auto& [key, value] : *this)
{
os << " " << key << ": " << value << std::endl;
}
}

private:
mutable std::mutex table_lock;
};
Expand Down
21 changes: 18 additions & 3 deletions echion/tasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ class TaskInfo
}

[[nodiscard]] static Result<TaskInfo::Ptr> current(PyObject*);
inline size_t unwind(FrameStack&, size_t& upper_python_stack_size);
inline Result<size_t> unwind(FrameStack&, size_t& upper_python_stack_size);
};

inline std::unordered_map<PyObject*, PyObject*> task_link_map;
Expand Down Expand Up @@ -360,39 +360,54 @@ inline std::vector<std::unique_ptr<StackInfo>> current_tasks;

// ----------------------------------------------------------------------------

inline size_t TaskInfo::unwind(FrameStack& stack, size_t& upper_python_stack_size)
inline Result<size_t> TaskInfo::unwind(FrameStack& stack, size_t& upper_python_stack_size)
{
// std::cerr <<"Unwinding the Task" << std::endl;
// TODO: Check for running task.
std::stack<PyObject*> coro_frames;

// Unwind the coro chain
for (auto coro = this->coro.get(); coro != NULL; coro = coro->await.get())
{
if (coro->frame != NULL)
if (coro->frame != NULL) {
coro_frames.push(coro->frame);
}
}

// Total number of frames added to the Stack
size_t count = 0;

// Unwind the coro frames
// std::cerr <<"Unwinding the coroutine frames, we have " << coro_frames.size() << " coro frames" << std::endl;
while (!coro_frames.empty())
{
PyObject* frame = coro_frames.top();
coro_frames.pop();

auto new_frames = unwind_frame(frame, stack);
// std::cerr <<"Unwound " << new_frames << " frames" << (new_frames > 1 ? " more than one" : "") << std::endl;
for (size_t i = 0; i < new_frames; i++) {
// std::cerr <<" " << i << ": " << string_table.lookup(stack[stack.size() - i - 1].get().name)->get() << std::endl;
}

// If this is the first Frame being unwound (we have not added any Frames to the Stack yet),
// use the number of Frames added to the Stack to determine the size of the upper Python stack.
if (count == 0) {
// The first Frame is the coroutine Frame, so the Python stack size is the number of Frames - 1
upper_python_stack_size = new_frames - 1;
// std::cerr <<"Determining the size of the upper Python stack: " << upper_python_stack_size << std::endl;

if (this->is_on_cpu && upper_python_stack_size == 0) {
// std::cerr <<"Task is on CPU, but the upper Python stack size is 0. This is not possible." << std::endl;
return ErrorKind::TaskInfoError;
}

// Remove the Python Frames from the Stack (they will be added back later)
// We cannot push those Frames now because otherwise they would be added once per Task,
// we only want to add them once per Leaf Task, and on top of all non-leaf Tasks.
// std::cerr <<"Removing " << upper_python_stack_size << " frames from the Stack" << std::endl;
for (size_t i = 0; i < upper_python_stack_size; i++) {
// std::cerr <<" - " << string_table.lookup(stack[stack.size() - 1].get().name)->get() << std::endl;
stack.pop_back();
}
}
Expand Down
151 changes: 150 additions & 1 deletion echion/threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,43 @@ inline void ThreadInfo::unwind(PyThreadState* tstate)
// ----------------------------------------------------------------------------
inline Result<void> ThreadInfo::unwind_tasks()
{
// std::cerr <<"===== Unwinding tasks ==" << std::endl;
// std::cerr <<"Python stack:" << std::endl;
for (size_t i = 0; i < python_stack.size(); i++)
{
// std::cerr <<" " << i << ": " << string_table.lookup(python_stack[i].get().name)->get() << std::endl;
}

// Check if the Python stack contains "_run".
// To avoid having to do string comparisons every time we unwind Tasks, we keep track
// of the cache key of the "_run" Frame.
static std::optional<Frame::Key> frame_cache_key;
bool expect_at_least_one_running_task = false;
if (!frame_cache_key) {
for (size_t i = 0; i < python_stack.size(); i++) {
const auto& frame = python_stack[i].get();
const auto& name = string_table.lookup(frame.name)->get();
if (name.size() >= 4 && name.rfind("_run") == name.size() - 4) {

// Although Frames are stored in an LRUCache, the cache key is ALWAYS the same
// even if the Frame gets evicted from the cache.
// This means we can keep the cache key and re-use it to determine
// whether we see the "_run" Frame in the Python stack.
frame_cache_key = frame.cache_key;
expect_at_least_one_running_task = true;
break;
}
}
} else {
for (size_t i = 0; i < python_stack.size(); i++) {
const auto& frame = python_stack[i].get();
if (frame.cache_key == *frame_cache_key) {
expect_at_least_one_running_task = true;
break;
}
}
}

std::vector<TaskInfo::Ref> leaf_tasks;
std::unordered_set<PyObject*> parent_tasks;
std::unordered_map<PyObject*, TaskInfo::Ref> waitee_map; // Indexed by task origin
Expand All @@ -247,6 +284,23 @@ inline Result<void> ThreadInfo::unwind_tasks()
}

auto all_tasks = std::move(*maybe_all_tasks);

bool saw_at_least_one_running_task = false;
std::string running_task_name;
for (const auto& task_ref : all_tasks) {
const auto& task = task_ref.get();
if (task->is_on_cpu) {
running_task_name = string_table.lookup(task->name)->get();
saw_at_least_one_running_task = true;
break;
}
}

if (saw_at_least_one_running_task != expect_at_least_one_running_task) {
// std::cerr <<"SKIPSKIP because of inconsistent task state, expected: " << expect_at_least_one_running_task << " saw: " << saw_at_least_one_running_task << " running task: " << running_task_name << std::endl;
return ErrorKind::TaskInfoError;
}

{
std::lock_guard<std::mutex> lock(task_link_map_lock);

Expand Down Expand Up @@ -284,6 +338,7 @@ inline Result<void> ThreadInfo::unwind_tasks()
{
// This task is not running, so we skip it if we are
// interested in just CPU time.
// std::cerr <<"SKIPPING TASK " << string_table.lookup(task->name)->get() << " because it is not on CPU" << std::endl;
continue;
}
leaf_tasks.push_back(std::ref(*task));
Expand All @@ -295,6 +350,71 @@ inline Result<void> ThreadInfo::unwind_tasks()
return ((a.get().is_on_cpu ? 0 : 1) < (b.get().is_on_cpu ? 0 : 1));
});

// std::cerr <<"Leaf tasks:" << std::endl;
// for (const auto& leaf_task : leaf_tasks) {
// std::cerr <<" " << string_table.lookup(leaf_task.get().name)->get() << " on cpu: " << leaf_task.get().is_on_cpu << std::endl;
// }

// Print a tree of Task dependencies
// std::cerr <<"Task tree:" << std::endl;
{
// Build parent->children maps
std::unordered_map<PyObject*, std::vector<PyObject*>> children_map; // parent -> children
std::unordered_set<PyObject*> has_parent;

// From gather links
{
std::lock_guard<std::mutex> lock(task_link_map_lock);
for (const auto& kv : task_link_map) {
children_map[kv.second].push_back(kv.first);
has_parent.insert(kv.first);
}
}

// From waiter relationships (waiter is the parent, current task is child)
for (const auto& task : all_tasks) {
if (task->waiter) {
children_map[task->waiter->origin].push_back(task->origin);
has_parent.insert(task->origin);
}
}

// Find roots (tasks with no parent)
std::vector<PyObject*> roots;
for (const auto& task : all_tasks) {
if (has_parent.find(task->origin) == has_parent.end()) {
roots.push_back(task->origin);
}
}

// Recursive print function
std::function<void(PyObject*, const std::string&, bool)> print_tree;
print_tree = [&](PyObject* task_origin, const std::string& prefix, bool is_last) {
auto it = origin_map.find(task_origin);
if (it == origin_map.end()) return;

const auto& task = it->second.get();
auto maybe_name = string_table.lookup(task.name);
std::string name_str = maybe_name ? maybe_name->get() : "<unknown>";

// std::cerr <<prefix << (is_last ? "└── " : "├── ") << name_str << " (" << task_origin << ")" << (task.is_on_cpu ? " [ON CPU]" : "") << std::endl;

auto children_it = children_map.find(task_origin);
if (children_it != children_map.end()) {
const auto& children = children_it->second;
for (size_t i = 0; i < children.size(); i++) {
print_tree(children[i], prefix + (is_last ? " " : "│ "), i == children.size() - 1);
}
}
};

for (size_t i = 0; i < roots.size(); i++) {
print_tree(roots[i], "", i == roots.size() - 1);
}
}



// The size of the "pure Python" stack (before asyncio Frames), computed later by TaskInfo::unwind
size_t upper_python_stack_size = 0;
// Unused variable, will be used later by TaskInfo::unwind
Expand All @@ -303,16 +423,24 @@ inline Result<void> ThreadInfo::unwind_tasks()
bool on_cpu_task_seen = false;
for (auto& leaf_task : leaf_tasks)
{
// std::cerr <<"== Unwinding leaf task: " << string_table.lookup(leaf_task.get().name)->get() << std::endl;
auto stack_info = std::make_unique<StackInfo>(leaf_task.get().name, leaf_task.get().is_on_cpu);
on_cpu_task_seen = on_cpu_task_seen || leaf_task.get().is_on_cpu;

auto& stack = stack_info->stack;
for (auto current_task = leaf_task;;)
{
auto& task = current_task.get();
// std::cerr <<"= Unwinding task: " << string_table.lookup(task.name)->get() << " on cpu: " << task.is_on_cpu << std::endl;

// The task_stack_size includes both the coroutines frames and the "upper" Python synchronous frames
size_t task_stack_size = task.unwind(stack, task.is_on_cpu ? upper_python_stack_size : unused);
auto maybe_task_stack_size = task.unwind(stack, task.is_on_cpu ? upper_python_stack_size : unused);
if (!maybe_task_stack_size) {
// std::cerr <<"SKIPSKIP inconsistent Task " << string_table.lookup(task.name)->get() << std::endl;
return ErrorKind::TaskInfoError;
}

size_t task_stack_size = std::move(*maybe_task_stack_size);
if (task.is_on_cpu)
{
// Get the "bottom" part of the Python synchronous Stack, that is to say the
Expand All @@ -321,15 +449,25 @@ inline Result<void> ThreadInfo::unwind_tasks()
// subtract the number of Frames in the "upper Python stack" (asyncio machinery + sync entrypoint)
// This gives us [outermost coroutine, ... , innermost coroutine, outermost sync function, ... , innermost sync function]
size_t frames_to_push = (python_stack.size() > task_stack_size) ? python_stack.size() - task_stack_size : 0;
// std::cerr <<"Task is on CPU, pushing " << frames_to_push << " frames" << std::endl;
for (size_t i = 0; i < frames_to_push; i++)
{
const auto& python_frame = python_stack[frames_to_push - i - 1];
// std::cerr <<" " << i << ": " << string_table.lookup(python_frame.get().name)->get() << std::endl;
stack.push_front(python_frame);
}
} else {
// std::cerr <<"Task is not on CPU..." << std::endl;
}

// Add the task name frame
stack.push_back(Frame::get(task.name));
// std::cerr <<"Pushed task name frame: " << string_table.lookup(task.name)->get() << std::endl;

// std::cerr <<"Stack at the end for that Task" << std::endl;
for (size_t i = 0; i < stack.size(); i++) {
// std::cerr <<" " << i << ": " << string_table.lookup(stack[i].get().name)->get() << std::endl;
}

// Get the next task in the chain
PyObject* task_origin = task.origin;
Expand Down Expand Up @@ -357,11 +495,22 @@ inline Result<void> ThreadInfo::unwind_tasks()
// Finish off with the remaining thread stack
// If we have seen an on-CPU Task, then upper_python_stack_size will be set and will include the sync entry point
// and the asyncio machinery Frames. Otherwise, we are in `select` (idle) and we should push all the Frames.
// std::cerr <<"Pushing the remaining Thread stack" << std::endl;
// for (size_t i = 0; i < python_stack.size() - (on_cpu_task_seen ? upper_python_stack_size : python_stack.size()); i++) {
// const auto& python_frame = python_stack[i];
// // std::cerr <<" Skipped: " << i << ": " << string_table.lookup(python_frame.get().name)->get() << std::endl;
// }
for (size_t i = python_stack.size() - (on_cpu_task_seen ? upper_python_stack_size : python_stack.size()); i < python_stack.size(); i++) {
const auto& python_frame = python_stack[i];
// std::cerr <<" Pushed: " << i << ": " << string_table.lookup(python_frame.get().name)->get() << std::endl;
stack.push_back(python_frame);
}

// std::cerr <<"Stack after pushing the remaining Thread stack" << std::endl;
for (size_t i = 0; i < stack.size(); i++) {
// std::cerr <<" " << i << ": " << string_table.lookup(stack[i].get().name)->get() << std::endl;
}

current_tasks.push_back(std::move(stack_info));
}

Expand Down
Loading
Loading