diff --git a/echion/monkey/asyncio.py b/echion/monkey/asyncio.py index 5d381256..71ceb3b7 100644 --- a/echion/monkey/asyncio.py +++ b/echion/monkey/asyncio.py @@ -74,8 +74,9 @@ def as_completed( loop = t.cast(t.Optional[asyncio.AbstractEventLoop], kwargs.get("loop")) parent: t.Optional[asyncio.Task] = tasks.current_task(loop) + futures: set[asyncio.Future] = set(fs) if parent is not None: - futures: set[asyncio.Future] = {asyncio.ensure_future(f, loop=loop) for f in set(fs)} + futures = {asyncio.ensure_future(f, loop=loop) for f in set(fs)} for child in futures: echion.link_tasks(parent, child) diff --git a/echion/tasks.h b/echion/tasks.h index 76ebe0e9..47da8829 100644 --- a/echion/tasks.h +++ b/echion/tasks.h @@ -202,7 +202,7 @@ class TaskInfo } [[nodiscard]] static Result current(PyObject*); - inline size_t unwind(FrameStack&, size_t& upper_python_stack_size); + inline Result unwind(FrameStack&, size_t& upper_python_stack_size); }; inline std::unordered_map task_link_map; @@ -360,7 +360,7 @@ inline std::vector> current_tasks; // ---------------------------------------------------------------------------- -inline size_t TaskInfo::unwind(FrameStack& stack, size_t& upper_python_stack_size) +inline Result TaskInfo::unwind(FrameStack& stack, size_t& upper_python_stack_size) { // TODO: Check for running task. std::stack coro_frames; @@ -395,6 +395,10 @@ inline size_t TaskInfo::unwind(FrameStack& stack, size_t& upper_python_stack_siz // The first Frame is the coroutine Frame, so the Python stack size is the number of Frames - 1 upper_python_stack_size = new_frames - 1; + if (this->is_on_cpu && upper_python_stack_size == 0) { + return ErrorKind::TaskInfoError; + } + // Remove the Python Frames from the Stack (they will be added back later) // We cannot push those Frames now because otherwise they would be added once per Task, // we only want to add them once per Leaf Task, and on top of all non-leaf Tasks. diff --git a/echion/threads.h b/echion/threads.h index 351da98b..a4935a66 100644 --- a/echion/threads.h +++ b/echion/threads.h @@ -235,10 +235,55 @@ inline void ThreadInfo::unwind(PyThreadState* tstate) // ---------------------------------------------------------------------------- inline Result ThreadInfo::unwind_tasks() { + // Check if the Python stack contains "_run". + // To avoid having to do string comparisons every time we unwind Tasks, we keep track + // of the cache key of the "_run" Frame. + static std::optional frame_cache_key; + bool expect_at_least_one_running_task = false; + if (!frame_cache_key) { + for (size_t i = 0; i < python_stack.size(); i++) { + const auto& frame = python_stack[i].get(); + const auto& name = string_table.lookup(frame.name)->get(); + +#if PY_VERSION_HEX >= 0x030b0000 + // After Python 3.11, function names in Frames are qualified with e.g. the class name, so we + // can use the qualified name to identify the "_run" Frame. + constexpr std::string_view _run = "Handle._run"; + auto is_run_frame = name.size() >= _run.size() && name == _run; +#else + // Before Python 3.11, function names in Frames are not qualified, so we + // can use the filename to identify the "_run" Frame. + constexpr std::string_view asyncio_runners_py = "asyncio/events.py"; + constexpr std::string_view _run = "_run"; + auto filename = string_table.lookup(frame.filename)->get(); + auto is_asyncio = filename.size() >= asyncio_runners_py.size() && filename.rfind(asyncio_runners_py) == filename.size() - asyncio_runners_py.size(); + auto is_run_frame = is_asyncio && (name.size() >= _run.size() && name.rfind(_run) == name.size() - _run.size()); +#endif + if (is_run_frame) { + // Although Frames are stored in an LRUCache, the cache key is ALWAYS the same + // even if the Frame gets evicted from the cache. + // This means we can keep the cache key and re-use it to determine + // whether we see the "_run" Frame in the Python stack. + frame_cache_key = frame.cache_key; + expect_at_least_one_running_task = true; + break; + } + } + } else { + for (size_t i = 0; i < python_stack.size(); i++) { + const auto& frame = python_stack[i].get(); + if (frame.cache_key == *frame_cache_key) { + expect_at_least_one_running_task = true; + break; + } + } + } + std::vector leaf_tasks; std::unordered_set parent_tasks; std::unordered_map waitee_map; // Indexed by task origin std::unordered_map origin_map; // Indexed by task origin + static std::unordered_set previous_task_objects; auto maybe_all_tasks = get_all_tasks(reinterpret_cast(asyncio_loop)); if (!maybe_all_tasks) @@ -247,6 +292,22 @@ inline Result ThreadInfo::unwind_tasks() } auto all_tasks = std::move(*maybe_all_tasks); + + bool saw_at_least_one_running_task = false; + std::string running_task_name; + for (const auto& task_ref : all_tasks) { + const auto& task = task_ref.get(); + if (task->is_on_cpu) { + running_task_name = string_table.lookup(task->name)->get(); + saw_at_least_one_running_task = true; + break; + } + } + + if (saw_at_least_one_running_task != expect_at_least_one_running_task) { + return ErrorKind::TaskInfoError; + } + { std::lock_guard lock(task_link_map_lock); @@ -263,13 +324,24 @@ inline Result ThreadInfo::unwind_tasks() if (all_task_origins.find(kv.first) == all_task_origins.end()) to_remove.push_back(kv.first); } - for (auto key : to_remove) - task_link_map.erase(key); + for (auto key : to_remove) { + // Only remove the link if the Child Task previously existed; otherwise it's a Task that + // has just been created and that wasn't in all_tasks when we took the snapshot. + if (previous_task_objects.find(key) != previous_task_objects.end()) { + task_link_map.erase(key); + } + } // Determine the parent tasks from the gather links. std::transform(task_link_map.cbegin(), task_link_map.cend(), std::inserter(parent_tasks, parent_tasks.begin()), [](const std::pair& kv) { return kv.second; }); + + // Copy all Task object pointers into previous_task_objects + previous_task_objects.clear(); + for (const auto& task : all_tasks) { + previous_task_objects.insert(task->origin); + } } for (auto& task : all_tasks) @@ -317,7 +389,12 @@ inline Result ThreadInfo::unwind_tasks() auto& task = current_task.get(); // The task_stack_size includes both the coroutines frames and the "upper" Python synchronous frames - size_t task_stack_size = task.unwind(stack, task.is_on_cpu ? upper_python_stack_size : unused); + auto maybe_task_stack_size = task.unwind(stack, task.is_on_cpu ? upper_python_stack_size : unused); + if (!maybe_task_stack_size) { + return ErrorKind::TaskInfoError; + } + + size_t task_stack_size = std::move(*maybe_task_stack_size); if (task.is_on_cpu) { // Get the "bottom" part of the Python synchronous Stack, that is to say the