Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 29 additions & 6 deletions echion/tasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,23 +186,23 @@ class TaskInfo
PyObject* origin = NULL;
PyObject* loop = NULL;

bool is_on_cpu = false;
GenInfo::Ptr coro = nullptr;

StringTable::Key name;
bool is_on_cpu = false;

// Information to reconstruct the async stack as best as we can
TaskInfo::Ptr waiter = nullptr;

[[nodiscard]] static Result<TaskInfo::Ptr> create(TaskObj*);
TaskInfo(PyObject* origin, PyObject* loop, GenInfo::Ptr coro, StringTable::Key name,
TaskInfo::Ptr waiter)
: origin(origin), loop(loop), coro(std::move(coro)), name(name), is_on_cpu(coro ? coro->is_running : false), waiter(std::move(waiter))
: origin(origin), loop(loop), is_on_cpu(coro ? coro->is_running : false), coro(std::move(coro)), name(name), waiter(std::move(waiter))
{
}

[[nodiscard]] static Result<TaskInfo::Ptr> current(PyObject*);
inline size_t unwind(FrameStack&);
inline size_t unwind(FrameStack&, size_t& upper_python_stack_size);
};

inline std::unordered_map<PyObject*, PyObject*> task_link_map;
Expand Down Expand Up @@ -360,7 +360,7 @@ inline std::vector<std::unique_ptr<StackInfo>> current_tasks;

// ----------------------------------------------------------------------------

inline size_t TaskInfo::unwind(FrameStack& stack)
inline size_t TaskInfo::unwind(FrameStack& stack, size_t& upper_python_stack_size)
{
// TODO: Check for running task.
std::stack<PyObject*> coro_frames;
Expand All @@ -372,15 +372,38 @@ inline size_t TaskInfo::unwind(FrameStack& stack)
coro_frames.push(coro->frame);
}

int count = 0;
// Total number of frames added to the Stack
size_t count = 0;

// Unwind the coro frames
while (!coro_frames.empty())
{
PyObject* frame = coro_frames.top();
coro_frames.pop();

count += unwind_frame(frame, stack);
auto new_frames = unwind_frame(frame, stack);

// If we failed to unwind the Frame, stop unwinding the coroutine chain; otherwise we could
// end up with Stacks with missing Frames between two coroutines Frames.
if (new_frames == 0) {
break;
}

// If this is the first Frame being unwound (we have not added any Frames to the Stack yet),
// use the number of Frames added to the Stack to determine the size of the upper Python stack.
if (count == 0) {
// The first Frame is the coroutine Frame, so the Python stack size is the number of Frames - 1
upper_python_stack_size = new_frames - 1;

// Remove the Python Frames from the Stack (they will be added back later)
// We cannot push those Frames now because otherwise they would be added once per Task,
// we only want to add them once per Leaf Task, and on top of all non-leaf Tasks.
for (size_t i = 0; i < upper_python_stack_size; i++) {
stack.pop_back();
}
}

count += new_frames;
}

return count;
Expand Down
65 changes: 43 additions & 22 deletions echion/threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,37 +290,46 @@ inline Result<void> ThreadInfo::unwind_tasks()
}
}

// Make sure the on CPU task is first
for (size_t i = 0; i < leaf_tasks.size(); i++) {
if (leaf_tasks[i].get().is_on_cpu) {
if (i > 0) {
std::swap(leaf_tasks[i], leaf_tasks[0]);
}
break;
}
}

// The size of the "pure Python" stack (before asyncio Frames), computed later by TaskInfo::unwind
size_t upper_python_stack_size = 0;
// Unused variable, will be used later by TaskInfo::unwind
size_t unused;

bool on_cpu_task_seen = false;
for (auto& leaf_task : leaf_tasks)
{
auto stack_info = std::make_unique<StackInfo>(leaf_task.get().name, leaf_task.get().is_on_cpu);
on_cpu_task_seen = on_cpu_task_seen || leaf_task.get().is_on_cpu;

auto& stack = stack_info->stack;
for (auto current_task = leaf_task;;)
{
auto& task = current_task.get();

size_t stack_size = task.unwind(stack);

// The task_stack_size includes both the coroutines frames and the "upper" Python synchronous frames
size_t task_stack_size = task.unwind(stack, task.is_on_cpu ? upper_python_stack_size : unused);
if (task.is_on_cpu)
{
// Undo the stack unwinding
// TODO[perf]: not super-efficient :(
for (size_t i = 0; i < stack_size; i++)
stack.pop_back();

// Instead we get part of the thread stack
FrameStack temp_stack;
size_t nframes =
(python_stack.size() > stack_size) ? python_stack.size() - stack_size : 0;
for (size_t i = 0; i < nframes; i++)
{
auto python_frame = python_stack.front();
temp_stack.push_front(python_frame);
python_stack.pop_front();
}
while (!temp_stack.empty())
// Get the "bottom" part of the Python synchronous Stack, that is to say the
// synchronous functions and coroutines called by the Task's outermost coroutine
// The number of Frames to push is the total number of Frames in the Python stack, from which we
// subtract the number of Frames in the "upper Python stack" (asyncio machinery + sync entrypoint)
// This gives us [outermost coroutine, ... , innermost coroutine, outermost sync function, ... , innermost sync function]
size_t frames_to_push = (python_stack.size() > task_stack_size) ? python_stack.size() - task_stack_size : 0;
for (size_t i = 0; i < frames_to_push; i++)
{
stack.push_front(temp_stack.front());
temp_stack.pop_front();
const auto& python_frame = python_stack[frames_to_push - i - 1];
stack.push_front(python_frame);
}
}

Expand Down Expand Up @@ -351,8 +360,20 @@ inline Result<void> ThreadInfo::unwind_tasks()
}

// Finish off with the remaining thread stack
for (auto p = python_stack.begin(); p != python_stack.end(); p++)
stack.push_back(*p);
// If we have seen an on-CPU Task, then upper_python_stack_size will be set and will include the sync entry point
// and the asyncio machinery Frames. Otherwise, we are in `select` (idle) and we should push all the Frames.

// There could be a race condition where relevant partial Python Thread Stack ends up being different from the
// one we saw in TaskInfo::unwind. This is extremely unlikely, I believe, but failing to account for it would
// cause an underflow, so let's be conservative.
size_t start_index = 0;
if (on_cpu_task_seen && python_stack.size() >= upper_python_stack_size) {
start_index = python_stack.size() - upper_python_stack_size;
}
for (size_t i = start_index; i < python_stack.size(); i++) {
const auto& python_frame = python_stack[i];
stack.push_back(python_frame);
}

current_tasks.push_back(std::move(stack_info));
}
Expand Down
8 changes: 4 additions & 4 deletions tests/target_async_coroutines.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@


async def outer_function():
async def background_task_func() -> None:
async def background_wait_function() -> None:
await asyncio.sleep(2.5)

async def background_math_function() -> None:
s = 0.0
for i in range(100000):
s += math.sin(i)

background_task = loop.create_task(background_task_func(), name="background_wait")
math_task = loop.create_task(background_math_function(), name="background_math")
background_task = loop.create_task(background_wait_function(), name="Task-background_wait")
math_task = loop.create_task(background_math_function(), name="Task-background_math")
assert background_task is not None

sleep_time = 0.2
Expand All @@ -41,5 +41,5 @@ async def main_coro():

return result

main_task = loop.create_task(outer_function(), name="main")
main_task = loop.create_task(outer_function(), name="Task-main")
loop.run_until_complete(main_task)
27 changes: 27 additions & 0 deletions tests/target_asyncio_idle_wait.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import asyncio


async def inner2() -> None:
await asyncio.sleep(1)


async def inner1() -> None:
t = asyncio.create_task(inner2())

await t


async def outer():
await inner1()


async def async_main():
await outer()


def main_sync():
asyncio.run(async_main())


if __name__ == "__main__":
main_sync()
43 changes: 43 additions & 0 deletions tests/target_asyncio_recursive_on_cpu_coros.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import asyncio
import time


def sync_code() -> int:
target = time.time() + 1
result = 0
while time.time() < target:
result += 1

return result


def sync_code_outer() -> int:
return sync_code()


async def inner3() -> int:
return sync_code_outer()


async def inner2() -> int:
return await inner3()


async def inner1() -> int:
return await inner2()


async def outer():
return await inner1()


async def async_main():
return await outer()


def main_sync():
asyncio.run(async_main())


if __name__ == "__main__":
main_sync()
45 changes: 45 additions & 0 deletions tests/target_asyncio_recursive_on_cpu_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import asyncio
import time


def sync_code() -> int:
target = time.time() + 1
result = 0
while time.time() < target:
result += 1

return result


def sync_code_outer() -> int:
return sync_code()


async def inner3() -> int:
return sync_code_outer()


async def inner2() -> int:
return await inner3()


async def inner1() -> int:
t = asyncio.create_task(inner2())

return await t


async def outer():
return await inner1()


async def async_main():
return await outer()


def main_sync():
asyncio.run(async_main())


if __name__ == "__main__":
main_sync()
36 changes: 36 additions & 0 deletions tests/target_asyncio_within_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import asyncio
import time


def synchronous_code_dep() -> None:
time.sleep(0.25)


def synchronous_code() -> None:
synchronous_code_dep()


async def inner() -> None:
synchronous_code()


async def outer() -> None:
await inner()
await asyncio.sleep(0.25)


async def async_main() -> None:
await outer()


def async_starter() -> None:
asyncio.run(async_main())


def sync_main() -> None:
async_starter()
time.sleep(0.25)


if __name__ == "__main__":
sync_main()
2 changes: 1 addition & 1 deletion tests/target_bytecode.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def replace_bytecode():
if __name__ == "__main__":
Thread(target=replace_bytecode, daemon=True).start()

end = time() + 5
end = time() + 2

while time() < end:
foo(12)
Loading
Loading