diff --git a/echion/core.pyi b/echion/core.pyi index 03e8921f..fcbbc4f1 100644 --- a/echion/core.pyi +++ b/echion/core.pyi @@ -21,6 +21,8 @@ def init_asyncio( threads: list, scheduled_tasks: set, eager_tasks: set | None ) -> None: ... def link_tasks(parent: Task, child: Future) -> None: ... +def weak_link_tasks(parent: Task, child: Future) -> None: ... + # Greenlet support def track_greenlet( diff --git a/echion/coremodule.cc b/echion/coremodule.cc index a1ccdf09..0be10acb 100644 --- a/echion/coremodule.cc +++ b/echion/coremodule.cc @@ -506,6 +506,25 @@ static PyObject* link_tasks(PyObject* Py_UNUSED(m), PyObject* args) Py_RETURN_NONE; } +// ---------------------------------------------------------------------------- +static PyObject* weak_link_tasks(PyObject* Py_UNUSED(m), PyObject* args) +{ + PyObject *parent, *child; + + if (!PyArg_ParseTuple(args, "OO", &parent, &child)) + return NULL; + + + { + std::lock_guard guard(task_link_map_lock); + + std::cerr << "setting weak link from " << child << " to " << parent << std::endl; + weak_task_link_map[child] = parent; + } + + Py_RETURN_NONE; +} + // ---------------------------------------------------------------------------- static PyMethodDef echion_core_methods[] = { {"start", start, METH_NOARGS, "Start the stack sampler"}, @@ -519,6 +538,7 @@ static PyMethodDef echion_core_methods[] = { "Map the name of a task with its identifier"}, {"init_asyncio", init_asyncio, METH_VARARGS, "Initialise asyncio tracking"}, {"link_tasks", link_tasks, METH_VARARGS, "Link two tasks"}, + {"weak_link_tasks", weak_link_tasks, METH_VARARGS, "Weakly link two tasks"}, // Greenlet support {"track_greenlet", track_greenlet, METH_VARARGS, "Map a greenlet with its identifier"}, {"untrack_greenlet", untrack_greenlet, METH_VARARGS, "Untrack a terminated greenlet"}, diff --git a/echion/monkey/asyncio.py b/echion/monkey/asyncio.py index 5d381256..cdcf294e 100644 --- a/echion/monkey/asyncio.py +++ b/echion/monkey/asyncio.py @@ -84,6 +84,22 @@ def as_completed( # ----------------------------------------------------------------------------- +_create_task = tasks.create_task + + +@wraps(_create_task) +def create_task(coro, *, name: t.Optional[str] = None, **kwargs: t.Any) -> asyncio.Task[t.Any]: + # kwargs will typically contain context (Python 3.11+ only) and eager_start (Python 3.14+ only) + task = _create_task(coro, name=name, **kwargs) + parent: t.Optional[asyncio.Task] = tasks.current_task() + + if parent is not None: + echion.weak_link_tasks(parent, task) + + return task + + +# ----------------------------------------------------------------------------- def patch() -> None: BaseDefaultEventLoopPolicy.set_event_loop = set_event_loop # type: ignore[method-assign] @@ -91,6 +107,8 @@ def patch() -> None: tasks._wait = wait # type: ignore[attr-defined] tasks.as_completed = as_completed # type: ignore[attr-defined,assignment] asyncio.as_completed = as_completed # type: ignore[attr-defined,assignment] + tasks.create_task = create_task # type: ignore[attr-defined,assignment] + asyncio.create_task = create_task # type: ignore[attr-defined,assignment] def unpatch() -> None: @@ -99,6 +117,8 @@ def unpatch() -> None: tasks._wait = _wait # type: ignore[attr-defined] tasks.as_completed = _as_completed # type: ignore[attr-defined] asyncio.as_completed = _as_completed + tasks.create_task = _create_task # type: ignore[attr-defined] + asyncio.create_task = _create_task # type: ignore[attr-defined] def track() -> None: diff --git a/echion/tasks.h b/echion/tasks.h index 7e6ce664..cf74c179 100644 --- a/echion/tasks.h +++ b/echion/tasks.h @@ -206,6 +206,7 @@ class TaskInfo }; inline std::unordered_map task_link_map; +inline std::unordered_map weak_task_link_map; inline std::mutex task_link_map_lock; // ---------------------------------------------------------------------------- diff --git a/echion/threads.h b/echion/threads.h index 4d10e40a..9090af81 100644 --- a/echion/threads.h +++ b/echion/threads.h @@ -266,10 +266,38 @@ inline Result ThreadInfo::unwind_tasks() for (auto key : to_remove) task_link_map.erase(key); - // Determine the parent tasks from the gather links. - std::transform(task_link_map.cbegin(), task_link_map.cend(), - std::inserter(parent_tasks, parent_tasks.begin()), - [](const std::pair& kv) { return kv.second; }); + // Clean up the weak_task_link_map. + // Remove entries associated to tasks that no longer exist. + all_task_origins.clear(); + std::transform(all_tasks.cbegin(), all_tasks.cend(), + std::inserter(all_task_origins, all_task_origins.begin()), + [](const TaskInfo::Ptr& task) { return task->origin; }); + + to_remove.clear(); + for (auto kv : weak_task_link_map) + { + if (all_task_origins.find(kv.first) == all_task_origins.end()) + to_remove.push_back(kv.first); + } + + for (auto key : to_remove) { + weak_task_link_map.erase(key); + } + + // Determine the parent tasks from the gather (strong) links. + for (auto& link : task_link_map) + { + auto parent = link.second; + + // Check if the parent is actually the child of another Task + auto is_child = weak_task_link_map.find(parent) != weak_task_link_map.end(); + + // Only insert if we do not know of a Task that created the current Task + if (!is_child) + { + parent_tasks.insert(parent); + } + } } for (auto& task : all_tasks) @@ -297,6 +325,7 @@ inline Result ThreadInfo::unwind_tasks() for (auto current_task = leaf_task;;) { auto& task = current_task.get(); + std::cerr << "Unwinding task " << string_table.lookup(task.name)->get() << std::endl; size_t stack_size = task.unwind(stack); @@ -324,29 +353,52 @@ inline Result ThreadInfo::unwind_tasks() } } + std::cerr << "Stack after unwinding task " << string_table.lookup(task.name)->get() << std::endl; + for (auto& frame : stack) { + std::cerr << " " << string_table.lookup(frame.get().name)->get() << std::endl; + } + // Add the task name frame stack.push_back(Frame::get(task.name)); + auto task_name = string_table.lookup(task.name)->get(); + std::cerr << "Searching for Task Link from " << task_name << std::endl; + // Get the next task in the chain PyObject* task_origin = task.origin; if (waitee_map.find(task_origin) != waitee_map.end()) { + std::cerr << "found waitee link from " << task_origin << " to " << waitee_map.find(task_origin)->second.get().name << std::endl; current_task = waitee_map.find(task_origin)->second; continue; } + { + // Check for, e.g., gather links std::lock_guard lock(task_link_map_lock); if (task_link_map.find(task_origin) != task_link_map.end() && origin_map.find(task_link_map[task_origin]) != origin_map.end()) { + std::cerr << "found strong link from " << task_origin << " to " << task_link_map[task_origin] << std::endl; current_task = origin_map.find(task_link_map[task_origin])->second; continue; } - } + // Check for weak links + if (weak_task_link_map.find(task_origin) != weak_task_link_map.end() && + origin_map.find(weak_task_link_map[task_origin]) != origin_map.end()) + { + std::cerr << "found weak link from " << task_origin << " to " << weak_task_link_map[task_origin] << std::endl; + current_task = origin_map.find(weak_task_link_map[task_origin])->second; + std::cerr << "-> current_task = " << string_table.lookup(current_task.get().name)->get() << std::endl; + continue; + } + } + + std::cerr << "no link found from " << task_origin << std::endl; break; } diff --git a/tests/target_async_not_awaited.py b/tests/target_async_not_awaited.py new file mode 100644 index 00000000..13d19efb --- /dev/null +++ b/tests/target_async_not_awaited.py @@ -0,0 +1,29 @@ +import asyncio + + +async def func_not_awaited() -> None: + await asyncio.sleep(0.5) + + +async def func_awaited() -> None: + await asyncio.sleep(1) + + +async def parent() -> asyncio.Task: + t_not_awaited = asyncio.create_task(func_not_awaited(), name="Task-not_awaited") + t_awaited = asyncio.create_task(func_awaited(), name="Task-awaited") + + await t_awaited + + # At this point, we have not awaited t_not_awaited but it should have finished + # before t_awaited as the delay is much shorter. + # Returning it to avoid the warning on unused variable. + return t_not_awaited + + +def main(): + asyncio.run(parent()) + + +if __name__ == "__main__": + main() diff --git a/tests/test_asyncio_not_awaited.py b/tests/test_asyncio_not_awaited.py new file mode 100644 index 00000000..26e24948 --- /dev/null +++ b/tests/test_asyncio_not_awaited.py @@ -0,0 +1,62 @@ +import sys + +from tests.utils import PY, DataSummary, run_target, dump_summary + + +def test_asyncio_not_awaited_wall_time() -> None: + result, data = run_target("target_async_not_awaited") + assert result.returncode == 0, result.stderr.decode() + + assert data is not None + md = data.metadata + assert md["mode"] == "wall" + assert md["interval"] == "1000" + + summary = DataSummary(data) + dump_summary(summary, "summary_asyncio_not_awaited.json") + + # We should see a stack for Task-1 / parent / Task-not_awaited / not_awaited / sleep + # Even though Task-1 does not await Task-not_awaited, the fact that there is a weak (parent - child) link + # means that Task-not_awaited is under Task-1. + summary.assert_substack( + "0:MainThread", + ( + "Task-1", + "parent", + "Task-not_awaited", + "func_not_awaited", + "sleep", + ), + lambda v: v > 0.0, + ) + + # We should see a stack for Task-1 / parent / Task-awaited / awaited / sleep + # That is because Task-1 is awaiting Task-awaited. + summary.assert_substack( + "0:MainThread", + ( + "Task-1", + "parent", + "Task-awaited", + "func_awaited", + "sleep", + ), + lambda v: v > 0.0, + ) + + # We should never see the Task-not_awaited Frame without Task-1 up in the Stack + # or it would mean we are not properly following parent-child links. + summary.assert_not_substack( + "0:MainThread", + ( + ( + ("KqueueSelector" if sys.platform == "darwin" else "EpollSelector") + if PY >= (3, 11) + else "" + ) + + "select", + "Task-not_awaited", + "func_not_awaited", + "sleep", + ), + )