Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions echion/core.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ def init_asyncio(
threads: list, scheduled_tasks: set, eager_tasks: set | None
) -> None: ...
def link_tasks(parent: Task, child: Future) -> None: ...
def weak_link_tasks(parent: Task, child: Future) -> None: ...


# Greenlet support
def track_greenlet(
Expand Down
20 changes: 20 additions & 0 deletions echion/coremodule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,25 @@ static PyObject* link_tasks(PyObject* Py_UNUSED(m), PyObject* args)
Py_RETURN_NONE;
}

// ----------------------------------------------------------------------------
static PyObject* weak_link_tasks(PyObject* Py_UNUSED(m), PyObject* args)
{
PyObject *parent, *child;

if (!PyArg_ParseTuple(args, "OO", &parent, &child))
return NULL;


{
std::lock_guard<std::mutex> guard(task_link_map_lock);

std::cerr << "setting weak link from " << child << " to " << parent << std::endl;
weak_task_link_map[child] = parent;
}

Py_RETURN_NONE;
}

// ----------------------------------------------------------------------------
static PyMethodDef echion_core_methods[] = {
{"start", start, METH_NOARGS, "Start the stack sampler"},
Expand All @@ -519,6 +538,7 @@ static PyMethodDef echion_core_methods[] = {
"Map the name of a task with its identifier"},
{"init_asyncio", init_asyncio, METH_VARARGS, "Initialise asyncio tracking"},
{"link_tasks", link_tasks, METH_VARARGS, "Link two tasks"},
{"weak_link_tasks", weak_link_tasks, METH_VARARGS, "Weakly link two tasks"},
// Greenlet support
{"track_greenlet", track_greenlet, METH_VARARGS, "Map a greenlet with its identifier"},
{"untrack_greenlet", untrack_greenlet, METH_VARARGS, "Untrack a terminated greenlet"},
Expand Down
20 changes: 20 additions & 0 deletions echion/monkey/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,31 @@ def as_completed(

# -----------------------------------------------------------------------------

_create_task = tasks.create_task


@wraps(_create_task)
def create_task(coro, *, name: t.Optional[str] = None, **kwargs: t.Any) -> asyncio.Task[t.Any]:
# kwargs will typically contain context (Python 3.11+ only) and eager_start (Python 3.14+ only)
task = _create_task(coro, name=name, **kwargs)
parent: t.Optional[asyncio.Task] = tasks.current_task()

if parent is not None:
echion.weak_link_tasks(parent, task)

return task


# -----------------------------------------------------------------------------

def patch() -> None:
BaseDefaultEventLoopPolicy.set_event_loop = set_event_loop # type: ignore[method-assign]
tasks._GatheringFuture.__init__ = gather # type: ignore[attr-defined]
tasks._wait = wait # type: ignore[attr-defined]
tasks.as_completed = as_completed # type: ignore[attr-defined,assignment]
asyncio.as_completed = as_completed # type: ignore[attr-defined,assignment]
tasks.create_task = create_task # type: ignore[attr-defined,assignment]
asyncio.create_task = create_task # type: ignore[attr-defined,assignment]


def unpatch() -> None:
Expand All @@ -99,6 +117,8 @@ def unpatch() -> None:
tasks._wait = _wait # type: ignore[attr-defined]
tasks.as_completed = _as_completed # type: ignore[attr-defined]
asyncio.as_completed = _as_completed
tasks.create_task = _create_task # type: ignore[attr-defined]
asyncio.create_task = _create_task # type: ignore[attr-defined]


def track() -> None:
Expand Down
1 change: 1 addition & 0 deletions echion/tasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ class TaskInfo
};

inline std::unordered_map<PyObject*, PyObject*> task_link_map;
inline std::unordered_map<PyObject*, PyObject*> weak_task_link_map;
inline std::mutex task_link_map_lock;

// ----------------------------------------------------------------------------
Expand Down
62 changes: 57 additions & 5 deletions echion/threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,10 +266,38 @@ inline Result<void> ThreadInfo::unwind_tasks()
for (auto key : to_remove)
task_link_map.erase(key);

// Determine the parent tasks from the gather links.
std::transform(task_link_map.cbegin(), task_link_map.cend(),
std::inserter(parent_tasks, parent_tasks.begin()),
[](const std::pair<PyObject*, PyObject*>& kv) { return kv.second; });
// Clean up the weak_task_link_map.
// Remove entries associated to tasks that no longer exist.
all_task_origins.clear();
std::transform(all_tasks.cbegin(), all_tasks.cend(),
std::inserter(all_task_origins, all_task_origins.begin()),
[](const TaskInfo::Ptr& task) { return task->origin; });

to_remove.clear();
for (auto kv : weak_task_link_map)
{
if (all_task_origins.find(kv.first) == all_task_origins.end())
to_remove.push_back(kv.first);
}

for (auto key : to_remove) {
weak_task_link_map.erase(key);
}

// Determine the parent tasks from the gather (strong) links.
for (auto& link : task_link_map)
{
auto parent = link.second;

// Check if the parent is actually the child of another Task
auto is_child = weak_task_link_map.find(parent) != weak_task_link_map.end();

// Only insert if we do not know of a Task that created the current Task
if (!is_child)
{
parent_tasks.insert(parent);
}
}
}

for (auto& task : all_tasks)
Expand Down Expand Up @@ -297,6 +325,7 @@ inline Result<void> ThreadInfo::unwind_tasks()
for (auto current_task = leaf_task;;)
{
auto& task = current_task.get();
std::cerr << "Unwinding task " << string_table.lookup(task.name)->get() << std::endl;

size_t stack_size = task.unwind(stack);

Expand Down Expand Up @@ -324,29 +353,52 @@ inline Result<void> ThreadInfo::unwind_tasks()
}
}

std::cerr << "Stack after unwinding task " << string_table.lookup(task.name)->get() << std::endl;
for (auto& frame : stack) {
std::cerr << " " << string_table.lookup(frame.get().name)->get() << std::endl;
}

// Add the task name frame
stack.push_back(Frame::get(task.name));

auto task_name = string_table.lookup(task.name)->get();
std::cerr << "Searching for Task Link from " << task_name << std::endl;

// Get the next task in the chain
PyObject* task_origin = task.origin;
if (waitee_map.find(task_origin) != waitee_map.end())
{
std::cerr << "found waitee link from " << task_origin << " to " << waitee_map.find(task_origin)->second.get().name << std::endl;
current_task = waitee_map.find(task_origin)->second;
continue;
}


{

// Check for, e.g., gather links
std::lock_guard<std::mutex> lock(task_link_map_lock);

if (task_link_map.find(task_origin) != task_link_map.end() &&
origin_map.find(task_link_map[task_origin]) != origin_map.end())
{
std::cerr << "found strong link from " << task_origin << " to " << task_link_map[task_origin] << std::endl;
current_task = origin_map.find(task_link_map[task_origin])->second;
continue;
}
}

// Check for weak links
if (weak_task_link_map.find(task_origin) != weak_task_link_map.end() &&
origin_map.find(weak_task_link_map[task_origin]) != origin_map.end())
{
std::cerr << "found weak link from " << task_origin << " to " << weak_task_link_map[task_origin] << std::endl;
current_task = origin_map.find(weak_task_link_map[task_origin])->second;
std::cerr << "-> current_task = " << string_table.lookup(current_task.get().name)->get() << std::endl;
continue;
}
}

std::cerr << "no link found from " << task_origin << std::endl;
break;
}

Expand Down
29 changes: 29 additions & 0 deletions tests/target_async_not_awaited.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import asyncio


async def func_not_awaited() -> None:
await asyncio.sleep(0.5)


async def func_awaited() -> None:
await asyncio.sleep(1)


async def parent() -> asyncio.Task:
t_not_awaited = asyncio.create_task(func_not_awaited(), name="Task-not_awaited")
t_awaited = asyncio.create_task(func_awaited(), name="Task-awaited")

await t_awaited

# At this point, we have not awaited t_not_awaited but it should have finished
# before t_awaited as the delay is much shorter.
# Returning it to avoid the warning on unused variable.
return t_not_awaited


def main():
asyncio.run(parent())


if __name__ == "__main__":
main()
62 changes: 62 additions & 0 deletions tests/test_asyncio_not_awaited.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import sys

from tests.utils import PY, DataSummary, run_target, dump_summary


def test_asyncio_not_awaited_wall_time() -> None:
result, data = run_target("target_async_not_awaited")
assert result.returncode == 0, result.stderr.decode()

assert data is not None
md = data.metadata
assert md["mode"] == "wall"
assert md["interval"] == "1000"

summary = DataSummary(data)
dump_summary(summary, "summary_asyncio_not_awaited.json")

# We should see a stack for Task-1 / parent / Task-not_awaited / not_awaited / sleep
# Even though Task-1 does not await Task-not_awaited, the fact that there is a weak (parent - child) link
# means that Task-not_awaited is under Task-1.
summary.assert_substack(
"0:MainThread",
(
"Task-1",
"parent",
"Task-not_awaited",
"func_not_awaited",
"sleep",
),
lambda v: v > 0.0,
)

# We should see a stack for Task-1 / parent / Task-awaited / awaited / sleep
# That is because Task-1 is awaiting Task-awaited.
summary.assert_substack(
"0:MainThread",
(
"Task-1",
"parent",
"Task-awaited",
"func_awaited",
"sleep",
),
lambda v: v > 0.0,
)

# We should never see the Task-not_awaited Frame without Task-1 up in the Stack
# or it would mean we are not properly following parent-child links.
summary.assert_not_substack(
"0:MainThread",
(
(
("KqueueSelector" if sys.platform == "darwin" else "EpollSelector")
if PY >= (3, 11)
else ""
)
+ "select",
"Task-not_awaited",
"func_not_awaited",
"sleep",
),
)