Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion echion/monkey/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ def as_completed(
loop = t.cast(t.Optional[asyncio.AbstractEventLoop], kwargs.get("loop"))
parent: t.Optional[asyncio.Task] = tasks.current_task(loop)

futures: set[asyncio.Future] = set(fs)
if parent is not None:
futures: set[asyncio.Future] = {asyncio.ensure_future(f, loop=loop) for f in set(fs)}
futures = {asyncio.ensure_future(f, loop=loop) for f in set(fs)}
for child in futures:
echion.link_tasks(parent, child)

Expand Down
8 changes: 6 additions & 2 deletions echion/tasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ class TaskInfo
}

[[nodiscard]] static Result<TaskInfo::Ptr> current(PyObject*);
inline size_t unwind(FrameStack&, size_t& upper_python_stack_size);
inline Result<size_t> unwind(FrameStack&, size_t& upper_python_stack_size);
};

inline std::unordered_map<PyObject*, PyObject*> task_link_map;
Expand Down Expand Up @@ -360,7 +360,7 @@ inline std::vector<std::unique_ptr<StackInfo>> current_tasks;

// ----------------------------------------------------------------------------

inline size_t TaskInfo::unwind(FrameStack& stack, size_t& upper_python_stack_size)
inline Result<size_t> TaskInfo::unwind(FrameStack& stack, size_t& upper_python_stack_size)
{
// TODO: Check for running task.
std::stack<PyObject*> coro_frames;
Expand Down Expand Up @@ -395,6 +395,10 @@ inline size_t TaskInfo::unwind(FrameStack& stack, size_t& upper_python_stack_siz
// The first Frame is the coroutine Frame, so the Python stack size is the number of Frames - 1
upper_python_stack_size = new_frames - 1;

if (this->is_on_cpu && upper_python_stack_size == 0) {
return ErrorKind::TaskInfoError;
}

// Remove the Python Frames from the Stack (they will be added back later)
// We cannot push those Frames now because otherwise they would be added once per Task,
// we only want to add them once per Leaf Task, and on top of all non-leaf Tasks.
Expand Down
83 changes: 80 additions & 3 deletions echion/threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,55 @@ inline void ThreadInfo::unwind(PyThreadState* tstate)
// ----------------------------------------------------------------------------
inline Result<void> ThreadInfo::unwind_tasks()
{
// Check if the Python stack contains "_run".
// To avoid having to do string comparisons every time we unwind Tasks, we keep track
// of the cache key of the "_run" Frame.
static std::optional<Frame::Key> frame_cache_key;
bool expect_at_least_one_running_task = false;
if (!frame_cache_key) {
for (size_t i = 0; i < python_stack.size(); i++) {
const auto& frame = python_stack[i].get();
const auto& name = string_table.lookup(frame.name)->get();

#if PY_VERSION_HEX >= 0x030b0000
// After Python 3.11, function names in Frames are qualified with e.g. the class name, so we
// can use the qualified name to identify the "_run" Frame.
constexpr std::string_view _run = "Handle._run";
auto is_run_frame = name.size() >= _run.size() && name == _run;
#else
// Before Python 3.11, function names in Frames are not qualified, so we
// can use the filename to identify the "_run" Frame.
constexpr std::string_view asyncio_runners_py = "asyncio/events.py";
constexpr std::string_view _run = "_run";
auto filename = string_table.lookup(frame.filename)->get();
auto is_asyncio = filename.size() >= asyncio_runners_py.size() && filename.rfind(asyncio_runners_py) == filename.size() - asyncio_runners_py.size();
auto is_run_frame = is_asyncio && (name.size() >= _run.size() && name.rfind(_run) == name.size() - _run.size());
#endif
if (is_run_frame) {
// Although Frames are stored in an LRUCache, the cache key is ALWAYS the same
// even if the Frame gets evicted from the cache.
// This means we can keep the cache key and re-use it to determine
// whether we see the "_run" Frame in the Python stack.
frame_cache_key = frame.cache_key;
expect_at_least_one_running_task = true;
break;
}
}
} else {
for (size_t i = 0; i < python_stack.size(); i++) {
const auto& frame = python_stack[i].get();
if (frame.cache_key == *frame_cache_key) {
expect_at_least_one_running_task = true;
break;
}
}
}

std::vector<TaskInfo::Ref> leaf_tasks;
std::unordered_set<PyObject*> parent_tasks;
std::unordered_map<PyObject*, TaskInfo::Ref> waitee_map; // Indexed by task origin
std::unordered_map<PyObject*, TaskInfo::Ref> origin_map; // Indexed by task origin
static std::unordered_set<PyObject*> previous_task_objects;

auto maybe_all_tasks = get_all_tasks(reinterpret_cast<PyObject*>(asyncio_loop));
if (!maybe_all_tasks)
Expand All @@ -247,6 +292,22 @@ inline Result<void> ThreadInfo::unwind_tasks()
}

auto all_tasks = std::move(*maybe_all_tasks);

bool saw_at_least_one_running_task = false;
std::string running_task_name;
for (const auto& task_ref : all_tasks) {
const auto& task = task_ref.get();
if (task->is_on_cpu) {
running_task_name = string_table.lookup(task->name)->get();
saw_at_least_one_running_task = true;
break;
}
}

if (saw_at_least_one_running_task != expect_at_least_one_running_task) {
return ErrorKind::TaskInfoError;
}

{
std::lock_guard<std::mutex> lock(task_link_map_lock);

Expand All @@ -263,13 +324,24 @@ inline Result<void> ThreadInfo::unwind_tasks()
if (all_task_origins.find(kv.first) == all_task_origins.end())
to_remove.push_back(kv.first);
}
for (auto key : to_remove)
task_link_map.erase(key);
for (auto key : to_remove) {
// Only remove the link if the Child Task previously existed; otherwise it's a Task that
// has just been created and that wasn't in all_tasks when we took the snapshot.
if (previous_task_objects.find(key) != previous_task_objects.end()) {
task_link_map.erase(key);
}
}

// Determine the parent tasks from the gather links.
std::transform(task_link_map.cbegin(), task_link_map.cend(),
std::inserter(parent_tasks, parent_tasks.begin()),
[](const std::pair<PyObject*, PyObject*>& kv) { return kv.second; });

// Copy all Task object pointers into previous_task_objects
previous_task_objects.clear();
for (const auto& task : all_tasks) {
previous_task_objects.insert(task->origin);
}
}

for (auto& task : all_tasks)
Expand Down Expand Up @@ -317,7 +389,12 @@ inline Result<void> ThreadInfo::unwind_tasks()
auto& task = current_task.get();

// The task_stack_size includes both the coroutines frames and the "upper" Python synchronous frames
size_t task_stack_size = task.unwind(stack, task.is_on_cpu ? upper_python_stack_size : unused);
auto maybe_task_stack_size = task.unwind(stack, task.is_on_cpu ? upper_python_stack_size : unused);
if (!maybe_task_stack_size) {
return ErrorKind::TaskInfoError;
}

size_t task_stack_size = std::move(*maybe_task_stack_size);
if (task.is_on_cpu)
{
// Get the "bottom" part of the Python synchronous Stack, that is to say the
Expand Down
Loading