diff --git a/echion/tasks.h b/echion/tasks.h index 7e6ce664..76ebe0e9 100644 --- a/echion/tasks.h +++ b/echion/tasks.h @@ -186,10 +186,10 @@ class TaskInfo PyObject* origin = NULL; PyObject* loop = NULL; + bool is_on_cpu = false; GenInfo::Ptr coro = nullptr; StringTable::Key name; - bool is_on_cpu = false; // Information to reconstruct the async stack as best as we can TaskInfo::Ptr waiter = nullptr; @@ -197,12 +197,12 @@ class TaskInfo [[nodiscard]] static Result create(TaskObj*); TaskInfo(PyObject* origin, PyObject* loop, GenInfo::Ptr coro, StringTable::Key name, TaskInfo::Ptr waiter) - : origin(origin), loop(loop), coro(std::move(coro)), name(name), is_on_cpu(coro ? coro->is_running : false), waiter(std::move(waiter)) + : origin(origin), loop(loop), is_on_cpu(coro ? coro->is_running : false), coro(std::move(coro)), name(name), waiter(std::move(waiter)) { } [[nodiscard]] static Result current(PyObject*); - inline size_t unwind(FrameStack&); + inline size_t unwind(FrameStack&, size_t& upper_python_stack_size); }; inline std::unordered_map task_link_map; @@ -360,7 +360,7 @@ inline std::vector> current_tasks; // ---------------------------------------------------------------------------- -inline size_t TaskInfo::unwind(FrameStack& stack) +inline size_t TaskInfo::unwind(FrameStack& stack, size_t& upper_python_stack_size) { // TODO: Check for running task. std::stack coro_frames; @@ -372,7 +372,8 @@ inline size_t TaskInfo::unwind(FrameStack& stack) coro_frames.push(coro->frame); } - int count = 0; + // Total number of frames added to the Stack + size_t count = 0; // Unwind the coro frames while (!coro_frames.empty()) @@ -380,7 +381,29 @@ inline size_t TaskInfo::unwind(FrameStack& stack) PyObject* frame = coro_frames.top(); coro_frames.pop(); - count += unwind_frame(frame, stack); + auto new_frames = unwind_frame(frame, stack); + + // If we failed to unwind the Frame, stop unwinding the coroutine chain; otherwise we could + // end up with Stacks with missing Frames between two coroutines Frames. + if (new_frames == 0) { + break; + } + + // If this is the first Frame being unwound (we have not added any Frames to the Stack yet), + // use the number of Frames added to the Stack to determine the size of the upper Python stack. + if (count == 0) { + // The first Frame is the coroutine Frame, so the Python stack size is the number of Frames - 1 + upper_python_stack_size = new_frames - 1; + + // Remove the Python Frames from the Stack (they will be added back later) + // We cannot push those Frames now because otherwise they would be added once per Task, + // we only want to add them once per Leaf Task, and on top of all non-leaf Tasks. + for (size_t i = 0; i < upper_python_stack_size; i++) { + stack.pop_back(); + } + } + + count += new_frames; } return count; diff --git a/echion/threads.h b/echion/threads.h index 4d10e40a..351da98b 100644 --- a/echion/threads.h +++ b/echion/threads.h @@ -290,37 +290,46 @@ inline Result ThreadInfo::unwind_tasks() } } + // Make sure the on CPU task is first + for (size_t i = 0; i < leaf_tasks.size(); i++) { + if (leaf_tasks[i].get().is_on_cpu) { + if (i > 0) { + std::swap(leaf_tasks[i], leaf_tasks[0]); + } + break; + } + } + + // The size of the "pure Python" stack (before asyncio Frames), computed later by TaskInfo::unwind + size_t upper_python_stack_size = 0; + // Unused variable, will be used later by TaskInfo::unwind + size_t unused; + + bool on_cpu_task_seen = false; for (auto& leaf_task : leaf_tasks) { auto stack_info = std::make_unique(leaf_task.get().name, leaf_task.get().is_on_cpu); + on_cpu_task_seen = on_cpu_task_seen || leaf_task.get().is_on_cpu; + auto& stack = stack_info->stack; for (auto current_task = leaf_task;;) { auto& task = current_task.get(); - size_t stack_size = task.unwind(stack); - + // The task_stack_size includes both the coroutines frames and the "upper" Python synchronous frames + size_t task_stack_size = task.unwind(stack, task.is_on_cpu ? upper_python_stack_size : unused); if (task.is_on_cpu) { - // Undo the stack unwinding - // TODO[perf]: not super-efficient :( - for (size_t i = 0; i < stack_size; i++) - stack.pop_back(); - - // Instead we get part of the thread stack - FrameStack temp_stack; - size_t nframes = - (python_stack.size() > stack_size) ? python_stack.size() - stack_size : 0; - for (size_t i = 0; i < nframes; i++) - { - auto python_frame = python_stack.front(); - temp_stack.push_front(python_frame); - python_stack.pop_front(); - } - while (!temp_stack.empty()) + // Get the "bottom" part of the Python synchronous Stack, that is to say the + // synchronous functions and coroutines called by the Task's outermost coroutine + // The number of Frames to push is the total number of Frames in the Python stack, from which we + // subtract the number of Frames in the "upper Python stack" (asyncio machinery + sync entrypoint) + // This gives us [outermost coroutine, ... , innermost coroutine, outermost sync function, ... , innermost sync function] + size_t frames_to_push = (python_stack.size() > task_stack_size) ? python_stack.size() - task_stack_size : 0; + for (size_t i = 0; i < frames_to_push; i++) { - stack.push_front(temp_stack.front()); - temp_stack.pop_front(); + const auto& python_frame = python_stack[frames_to_push - i - 1]; + stack.push_front(python_frame); } } @@ -351,8 +360,20 @@ inline Result ThreadInfo::unwind_tasks() } // Finish off with the remaining thread stack - for (auto p = python_stack.begin(); p != python_stack.end(); p++) - stack.push_back(*p); + // If we have seen an on-CPU Task, then upper_python_stack_size will be set and will include the sync entry point + // and the asyncio machinery Frames. Otherwise, we are in `select` (idle) and we should push all the Frames. + + // There could be a race condition where relevant partial Python Thread Stack ends up being different from the + // one we saw in TaskInfo::unwind. This is extremely unlikely, I believe, but failing to account for it would + // cause an underflow, so let's be conservative. + size_t start_index = 0; + if (on_cpu_task_seen && python_stack.size() >= upper_python_stack_size) { + start_index = python_stack.size() - upper_python_stack_size; + } + for (size_t i = start_index; i < python_stack.size(); i++) { + const auto& python_frame = python_stack[i]; + stack.push_back(python_frame); + } current_tasks.push_back(std::move(stack_info)); } diff --git a/tests/target_async_coroutines.py b/tests/target_async_coroutines.py index 19be117a..1f3b76c4 100644 --- a/tests/target_async_coroutines.py +++ b/tests/target_async_coroutines.py @@ -10,7 +10,7 @@ async def outer_function(): - async def background_task_func() -> None: + async def background_wait_function() -> None: await asyncio.sleep(2.5) async def background_math_function() -> None: @@ -18,8 +18,8 @@ async def background_math_function() -> None: for i in range(100000): s += math.sin(i) - background_task = loop.create_task(background_task_func(), name="background_wait") - math_task = loop.create_task(background_math_function(), name="background_math") + background_task = loop.create_task(background_wait_function(), name="Task-background_wait") + math_task = loop.create_task(background_math_function(), name="Task-background_math") assert background_task is not None sleep_time = 0.2 @@ -41,5 +41,5 @@ async def main_coro(): return result -main_task = loop.create_task(outer_function(), name="main") +main_task = loop.create_task(outer_function(), name="Task-main") loop.run_until_complete(main_task) diff --git a/tests/target_asyncio_idle_wait.py b/tests/target_asyncio_idle_wait.py new file mode 100644 index 00000000..bf65883a --- /dev/null +++ b/tests/target_asyncio_idle_wait.py @@ -0,0 +1,27 @@ +import asyncio + + +async def inner2() -> None: + await asyncio.sleep(1) + + +async def inner1() -> None: + t = asyncio.create_task(inner2()) + + await t + + +async def outer(): + await inner1() + + +async def async_main(): + await outer() + + +def main_sync(): + asyncio.run(async_main()) + + +if __name__ == "__main__": + main_sync() diff --git a/tests/target_asyncio_recursive_on_cpu_coros.py b/tests/target_asyncio_recursive_on_cpu_coros.py new file mode 100644 index 00000000..535c3039 --- /dev/null +++ b/tests/target_asyncio_recursive_on_cpu_coros.py @@ -0,0 +1,43 @@ +import asyncio +import time + + +def sync_code() -> int: + target = time.time() + 1 + result = 0 + while time.time() < target: + result += 1 + + return result + + +def sync_code_outer() -> int: + return sync_code() + + +async def inner3() -> int: + return sync_code_outer() + + +async def inner2() -> int: + return await inner3() + + +async def inner1() -> int: + return await inner2() + + +async def outer(): + return await inner1() + + +async def async_main(): + return await outer() + + +def main_sync(): + asyncio.run(async_main()) + + +if __name__ == "__main__": + main_sync() diff --git a/tests/target_asyncio_recursive_on_cpu_tasks.py b/tests/target_asyncio_recursive_on_cpu_tasks.py new file mode 100644 index 00000000..4ecc2481 --- /dev/null +++ b/tests/target_asyncio_recursive_on_cpu_tasks.py @@ -0,0 +1,45 @@ +import asyncio +import time + + +def sync_code() -> int: + target = time.time() + 1 + result = 0 + while time.time() < target: + result += 1 + + return result + + +def sync_code_outer() -> int: + return sync_code() + + +async def inner3() -> int: + return sync_code_outer() + + +async def inner2() -> int: + return await inner3() + + +async def inner1() -> int: + t = asyncio.create_task(inner2()) + + return await t + + +async def outer(): + return await inner1() + + +async def async_main(): + return await outer() + + +def main_sync(): + asyncio.run(async_main()) + + +if __name__ == "__main__": + main_sync() diff --git a/tests/target_asyncio_within_function.py b/tests/target_asyncio_within_function.py new file mode 100644 index 00000000..6162fa91 --- /dev/null +++ b/tests/target_asyncio_within_function.py @@ -0,0 +1,36 @@ +import asyncio +import time + + +def synchronous_code_dep() -> None: + time.sleep(0.25) + + +def synchronous_code() -> None: + synchronous_code_dep() + + +async def inner() -> None: + synchronous_code() + + +async def outer() -> None: + await inner() + await asyncio.sleep(0.25) + + +async def async_main() -> None: + await outer() + + +def async_starter() -> None: + asyncio.run(async_main()) + + +def sync_main() -> None: + async_starter() + time.sleep(0.25) + + +if __name__ == "__main__": + sync_main() diff --git a/tests/target_bytecode.py b/tests/target_bytecode.py index f65cd60a..0de8f922 100644 --- a/tests/target_bytecode.py +++ b/tests/target_bytecode.py @@ -23,7 +23,7 @@ def replace_bytecode(): if __name__ == "__main__": Thread(target=replace_bytecode, daemon=True).start() - end = time() + 5 + end = time() + 2 while time() < end: foo(12) diff --git a/tests/test_asyncio_as_completed.py b/tests/test_asyncio_as_completed.py index 542c1308..c42784c5 100644 --- a/tests/test_asyncio_as_completed.py +++ b/tests/test_asyncio_as_completed.py @@ -1,4 +1,6 @@ -from tests.utils import PY, DataSummary, run_target, retry_on_valueerror +import json + +from tests.utils import PY, DataSummary, run_target, retry_on_valueerror, dump_summary, summary_to_json @retry_on_valueerror() @@ -13,76 +15,64 @@ def test_asyncio_as_completed(): summary = DataSummary(data) - summary_json = {} - for thread in summary.threads: - summary_json[thread] = [ - { - "stack": key, - "metric": value, - } - for key, value in summary.threads[thread].items() - if key and isinstance(next(iter(key)), str) - ] - - with open("summary.json", "w") as f: - import json - - json.dump(summary_json, f, indent=2) + dump_summary(summary, "summary_asyncio_as_completed.json") # We expect MainThread and the sampler expected_nthreads = 2 assert summary.nthreads == expected_nthreads, summary.threads assert summary.total_metric >= 1.4 * 1e6 - # Test stacks and expected values - # TODO: these stacks need to be adapted to Python 3.11 (qual names have changed) - # but in the current state they don't work at all anyway. - # Thread Pool Executor - if PY >= (3, 13): - for i in range(3, 12): - summary.assert_substack( - "0:MainThread", - ( - "Task-1", - "main", - "_AsCompletedIterator._wait_for_one", - "Queue.get", - f"Task-{i}", - "wait_and_return_delay", - "other", - "sleep", - ), - lambda v: v >= 0.00, - ) - elif PY >= (3, 11): - for i in range(3, 12): - summary.assert_substack( - "0:MainThread", - ( - "Task-1", - "main", - "as_completed.._wait_for_one", - "Queue.get", - f"Task-{i}", - "wait_and_return_delay", - "other", - "sleep", - ), - lambda v: v >= 0.00, - ) - else: - for i in range(3, 12): - summary.assert_substack( - "0:MainThread", - ( - "Task-1", - "main", - "_wait_for_one", - "get", - f"Task-{i}", - "wait_and_return_delay", - "other", - "sleep", - ), - lambda v: v >= 0.00, - ) + try: + if PY >= (3, 13): + for i in range(3, 12): + summary.assert_substack( + "0:MainThread", + ( + "Task-1", + "main", + "_AsCompletedIterator._wait_for_one", + "Queue.get", + f"Task-{i}", + "wait_and_return_delay", + "other", + "sleep", + ), + lambda v: v >= 0.00, + ) + + elif PY >= (3, 11): + for i in range(3, 12): + summary.assert_substack( + "0:MainThread", + ( + "Task-1", + "main", + "as_completed.._wait_for_one", + "Queue.get", + f"Task-{i}", + "wait_and_return_delay", + "other", + "sleep", + ), + lambda v: v >= 0.00, + ) + else: + for i in range(3, 12): + summary.assert_substack( + "0:MainThread", + ( + "Task-1", + "main", + "_wait_for_one", + "get", + f"Task-{i}", + "wait_and_return_delay", + "other", + "sleep", + ), + lambda v: v >= 0.00, + ) + except AssertionError: + print("stderr", result.stderr.decode()) + print(json.dumps(summary_to_json(summary), indent=4)) + raise diff --git a/tests/test_asyncio_async_generator.py b/tests/test_asyncio_async_generator.py index b62d991c..f6107b5a 100644 --- a/tests/test_asyncio_async_generator.py +++ b/tests/test_asyncio_async_generator.py @@ -1,9 +1,8 @@ -import pytest +import json -from tests.utils import DataSummary, run_target, retry_on_valueerror +from tests.utils import DataSummary, run_target, retry_on_valueerror, dump_summary, summary_to_json -@pytest.mark.xfail(reason="This test is very flaky") @retry_on_valueerror() def test_asyncio_async_generator_wall_time() -> None: result, data = run_target("target_async_generator") @@ -15,6 +14,7 @@ def test_asyncio_async_generator_wall_time() -> None: assert md["interval"] == "1000" summary = DataSummary(data) + dump_summary(summary, "summary_asyncio_async_generator.json") summary_json = {} for thread in summary.threads: @@ -31,16 +31,21 @@ def test_asyncio_async_generator_wall_time() -> None: assert summary.nthreads >= expected_nthreads, summary.threads assert summary.total_metric >= 1.4 * 1e6 - summary.assert_substack( - "0:MainThread", - ( - "Task-1", - "main", - "asynchronous_function", - "async_generator", - "async_generator_dep", - "deep_dependency", - "sleep", - ), - lambda v: v >= 0.0 - ) + try: + summary.assert_substack( + "0:MainThread", + ( + "Task-1", + "main", + "asynchronous_function", + "async_generator", + "async_generator_dep", + "deep_dependency", + "sleep", + ), + lambda v: v >= 0.0, + ) + except AssertionError: + print("stderr", result.stderr.decode()) + print(json.dumps(summary_to_json(summary), indent=2)) + raise diff --git a/tests/test_asyncio_context_manager.py b/tests/test_asyncio_context_manager.py index d0ff3717..86414154 100644 --- a/tests/test_asyncio_context_manager.py +++ b/tests/test_asyncio_context_manager.py @@ -1,10 +1,11 @@ import pytest -from tests.utils import PY, DataSummary, run_target, retry_on_valueerror +from tests.utils import PY, DataSummary, run_target, retry_on_valueerror, dump_summary @retry_on_valueerror() @pytest.mark.xfail(condition=PY >= (3, 13), reason="Sampling async context manager stacks does not work on >=3.13") +@retry_on_valueerror() def test_asyncio_context_manager_wall_time(): result, data = run_target("target_async_with") assert result.returncode == 0, result.stderr.decode() @@ -27,6 +28,8 @@ def test_asyncio_context_manager_wall_time(): if key and isinstance(next(iter(key)), str) ] + dump_summary(summary, "summary_asyncio_context_manager_wall_time.json") + expected_nthreads = 2 assert summary.nthreads >= expected_nthreads, summary.threads assert summary.total_metric >= 1.4 * 1e6 diff --git a/tests/test_asyncio_coroutines.py b/tests/test_asyncio_coroutines.py index b1ed852f..d3abb3a1 100644 --- a/tests/test_asyncio_coroutines.py +++ b/tests/test_asyncio_coroutines.py @@ -1,4 +1,6 @@ -from tests.utils import PY, DataSummary, run_target, retry_on_valueerror +import json + +from tests.utils import PY, DataSummary, run_target, dump_summary, summary_to_json, retry_on_valueerror @retry_on_valueerror() @@ -12,6 +14,7 @@ def test_asyncio_coroutines_wall_time(): assert md["interval"] == "1000" summary = DataSummary(data) + dump_summary(summary, "summary_asyncio_coroutines.json") # We expect MainThread and the sampler expected_nthreads = 2 @@ -20,17 +23,13 @@ def test_asyncio_coroutines_wall_time(): # Test stacks and expected values if PY >= (3, 11): - # TODO: these stacks need to be adapted to Python 3.11 (qual names have changed) - # but in the current state they don't work at all anyway. - # Thread Pool Executor summary.assert_substack( "0:MainThread", ( - "outer_function..background_math_function", - "main", + "Task-main", "outer_function", - "outer_function..main_coro", - "outer_function..sub_coro", + "Task-background_wait", + "outer_function..background_wait_function", "sleep", ), lambda v: v >= 0.001e6, @@ -39,47 +38,53 @@ def test_asyncio_coroutines_wall_time(): summary.assert_substack( "0:MainThread", ( - "outer_function..background_math_function", - "background_math", + "Task-background_wait", + "outer_function..background_wait_function", + "sleep", ), lambda v: v >= 0.001e6, ) - else: - # Main Thread + summary.assert_substack( + "0:MainThread", + ("Task-main", "outer_function"), + lambda v: v >= 0.001e6, + ) + summary.assert_substack( "0:MainThread", ( - "_run_module_as_main", - "_run_code", - "", - "run_until_complete", - "run_forever", - "_run_once", - "select", - "main", + "Task-main", "outer_function", - "main_coro", + "outer_function..main_coro", + "outer_function..sub_coro", "sleep", ), - lambda v: v >= 0.1e6, + lambda v: v >= 0.001e6, ) + try: + summary.assert_not_substack( + "0:MainThread", + ( + "outer_function..background_math_function", + "Task-background_wait", + "outer_function..background_wait_function", + "sleep", + ), + ) + except AssertionError: + print("stderr", result.stderr.decode()) + print(json.dumps(summary_to_json(summary), indent=4)) + raise + else: # PY < (3, 11) summary.assert_substack( "0:MainThread", ( - "_run_module_as_main", - "_run_code", - "", - "run_until_complete", - "run_forever", - "_run_once", - "_run", - "background_math_function", - "main", + "Task-main", "outer_function", - "main_coro", - "sub_coro", + "Task-background_wait", + "background_wait_function", "sleep", ), lambda v: v >= 0.001e6, @@ -88,31 +93,23 @@ def test_asyncio_coroutines_wall_time(): summary.assert_substack( "0:MainThread", ( - "_run_module_as_main", - "_run_code", - "", - "run_until_complete", - "run_forever", - "_run_once", - "_run", - "background_math_function", - "background_math", + "Task-background_wait", + "background_wait_function", + "sleep", ), lambda v: v >= 0.001e6, ) - # Thread Pool Executor + + summary.assert_substack( + "0:MainThread", + ("Task-main", "outer_function"), + lambda v: v >= 0.001e6, + ) + summary.assert_substack( "0:MainThread", ( - "_run_module_as_main", - "_run_code", - "", - "run_until_complete", - "run_forever", - "_run_once", - "_run", - "background_math_function", - "main", + "Task-main", "outer_function", "main_coro", "sub_coro", @@ -120,3 +117,18 @@ def test_asyncio_coroutines_wall_time(): ), lambda v: v >= 0.001e6, ) + + try: + summary.assert_not_substack( + "0:MainThread", + ( + "background_math_function", + "Task-background_wait", + "background_wait_function", + "sleep", + ), + ) + except AssertionError: + print("stderr", result.stderr.decode()) + print(json.dumps(summary_to_json(summary), indent=4)) + raise diff --git a/tests/test_asyncio_deadlock.py b/tests/test_asyncio_deadlock.py index 8fe3edd2..152f2168 100644 --- a/tests/test_asyncio_deadlock.py +++ b/tests/test_asyncio_deadlock.py @@ -3,5 +3,5 @@ @retry_on_valueerror() def test_asyncio_deadlock(): - result, data = run_target("target_async_deadlock") + result, _ = run_target("target_async_deadlock") assert result.returncode == 0, result.stderr.decode() diff --git a/tests/test_asyncio_executor.py b/tests/test_asyncio_executor.py index 7fe7a14e..1a8d771d 100644 --- a/tests/test_asyncio_executor.py +++ b/tests/test_asyncio_executor.py @@ -1,4 +1,4 @@ -from tests.utils import PY, DataSummary, run_target, retry_on_valueerror +from tests.utils import PY, DataSummary, run_target, dump_summary, retry_on_valueerror @retry_on_valueerror() @@ -24,6 +24,8 @@ def test_asyncio_executor(): if key and isinstance(next(iter(key)), str) ] + dump_summary(summary, "summary_asyncio_executor.json") + expected_nthreads = 3 assert summary.nthreads >= expected_nthreads, summary.threads assert summary.total_metric >= 1.4 * 1e6 @@ -37,7 +39,7 @@ def test_asyncio_executor(): "main", "asynchronous_function", ), - lambda v: v >= 0.01e6, + lambda v: v > 0.00, ) # Thread Pool Executor @@ -63,7 +65,7 @@ def test_asyncio_executor(): "main", "asynchronous_function", ), - lambda v: v >= 0.1e6, + lambda v: v > 0.0, ) if PY >= (3, 9): diff --git a/tests/test_asyncio_gather_coroutines.py b/tests/test_asyncio_gather_coroutines.py index ba1c7782..8674a172 100644 --- a/tests/test_asyncio_gather_coroutines.py +++ b/tests/test_asyncio_gather_coroutines.py @@ -1,7 +1,10 @@ +import json from tests.utils import DataSummary from tests.utils import run_target +from tests.utils import dump_summary from tests.utils import retry_on_valueerror +from tests.utils import summary_to_json @retry_on_valueerror() @@ -15,63 +18,58 @@ def test_asyncio_gather_coroutines_wall_time(): assert md["interval"] == "1000" summary = DataSummary(data) + dump_summary(summary, "summary_asyncio_gather_coroutines.json") - summary_json = {} - for thread in summary.threads: - summary_json[thread] = [ - { - "stack": key, - "metric": value, - } - for key, value in summary.threads[thread].items() - if key and isinstance(next(iter(key)), str) - ] - - # We expect to see one stack for Task-1 / Task-2 / inner_1 and one for Task-1 / Task-3 / inner_2 try: - summary.assert_substack( - "0:MainThread", - ( - "Task-1", - "main", - "Task-2", - "inner_1", - "sleep", - ), - lambda v: v >= 0.0, - ) - summary.assert_substack( - "0:MainThread", - ( - "Task-1", - "main", - "Task-3", - "inner_2", - "sleep", - ), - lambda v: v >= 0.0, - ) + # We expect to see one stack for Task-1 / Task-2 / inner_1 and one for Task-1 / Task-3 / inner_2 + try: + summary.assert_substack( + "0:MainThread", + ( + "Task-1", + "main", + "Task-2", + "inner_1", + "sleep", + ), + lambda v: v >= 0.0, + ) + summary.assert_substack( + "0:MainThread", + ( + "Task-1", + "main", + "Task-3", + "inner_2", + "sleep", + ), + lambda v: v >= 0.0, + ) + except AssertionError: + # Search the other way around - Task 1 / Task 3 / inner_1 and Task 1 / Task 2 / inner_2 + summary.assert_substack( + "0:MainThread", + ( + "Task-1", + "main", + "Task-2", + "inner_2", + "sleep", + ), + lambda v: v >= 0.0, + ) + summary.assert_substack( + "0:MainThread", + ( + "Task-1", + "main", + "Task-3", + "inner_1", + "sleep", + ), + lambda v: v >= 0.0, + ) except AssertionError: - # Search the other way around - Task 1 / Task 3 / inner_1 and Task 1 / Task 2 / inner_2 - summary.assert_substack( - "0:MainThread", - ( - "Task-1", - "main", - "Task-2", - "inner_2", - "sleep", - ), - lambda v: v >= 0.0, - ) - summary.assert_substack( - "0:MainThread", - ( - "Task-1", - "main", - "Task-3", - "inner_1", - "sleep", - ), - lambda v: v >= 0.0, - ) + print("stderr", result.stderr.decode()) + print(json.dumps(summary_to_json(summary), indent=4)) + raise diff --git a/tests/test_asyncio_gather_tasks.py b/tests/test_asyncio_gather_tasks.py index a8137ac8..1b50c392 100644 --- a/tests/test_asyncio_gather_tasks.py +++ b/tests/test_asyncio_gather_tasks.py @@ -1,9 +1,12 @@ +import json import sys from tests.utils import PY from tests.utils import DataSummary from tests.utils import run_target from tests.utils import retry_on_valueerror +from tests.utils import dump_summary +from tests.utils import summary_to_json @retry_on_valueerror() @@ -18,6 +21,8 @@ def test_asyncio_gather_tasks_wall_time(): summary = DataSummary(data) + dump_summary(summary, "summary_asyncio_gather_tasks.json") + expected_nthreads = 2 assert summary.nthreads == expected_nthreads, summary.threads assert summary.total_metric >= 1.4 * 1e6 @@ -30,61 +35,85 @@ def test_asyncio_gather_tasks_wall_time(): summary.query("0:MainThread", (("F4_1", 0), ("f4", 22), ("f5", 26))) is not None ) - # Test stacks and expected values - if PY >= (3, 11): - for t in ("F4_0", "F4_1"): - summary.assert_substack( - "0:MainThread", - ( - "_run_module_as_main", - "_run_code", - "", - "run", - "Runner.run", - "BaseEventLoop.run_until_complete", - "BaseEventLoop.run_forever", - "BaseEventLoop._run_once", - "KqueueSelector.select" - if sys.platform == "darwin" - else "EpollSelector.select", - "Task-1", - "main", - "F1", - "f1", - "f2", - "F3", - "f3", - t, - "f4", - "f5", - "sleep", - ), - lambda v: v >= 0.45e6, - ) - else: - for t in ("F4_0", "F4_1"): - summary.assert_substack( - "0:MainThread", - ( - "_run_module_as_main", - "_run_code", - "", - "run", - "run_until_complete", - "run_forever", - "_run_once", - "select", - "Task-1", - "main", - "F1", - "f1", - "f2", - "F3", - "f3", - t, - "f4", - "f5", - "sleep", - ), - lambda v: v >= 0.45e6, - ) + try: + # Test stacks and expected values + if PY >= (3, 11): + for t in ("F4_0", "F4_1"): + summary.assert_substack( + "0:MainThread", + ( + "Task-1", + "main", + "F1", + "f1", + "f2", + "F3", + "f3", + t, + "f4", + "f5", + "sleep", + ), + lambda v: v >= 0.45e6, + ) + summary.assert_substack( + "0:MainThread", + ( + "_run_module_as_main", + "_run_code", + "", + "run", + "Runner.run", + "BaseEventLoop.run_until_complete", + "BaseEventLoop.run_forever", + "BaseEventLoop._run_once", + ( + "KqueueSelector.select" + if sys.platform == "darwin" + else "EpollSelector.select" + ), + "Task-1", + "main", + "F1", + "f1", + "f2", + "F3", + "f3", + t, + "f4", + "f5", + "sleep", + ), + lambda v: v >= 0.45e6, + ) + else: + for t in ("F4_0", "F4_1"): + summary.assert_substack( + "0:MainThread", + ( + "_run_module_as_main", + "_run_code", + "", + "run", + "run_until_complete", + "run_forever", + "_run_once", + "select", + "Task-1", + "main", + "F1", + "f1", + "f2", + "F3", + "f3", + t, + "f4", + "f5", + "sleep", + ), + lambda v: v >= 0.45e6, + ) + except AssertionError: + print("stderr", result.stderr.decode()) + print(json.dumps(summary_to_json(summary), indent=4)) + raise diff --git a/tests/test_asyncio_recursive_on_cpu_coros.py b/tests/test_asyncio_recursive_on_cpu_coros.py new file mode 100644 index 00000000..2d00747a --- /dev/null +++ b/tests/test_asyncio_recursive_on_cpu_coros.py @@ -0,0 +1,78 @@ +from tests.utils import PY, DataSummary +from tests.utils import dump_summary, run_target, retry_on_valueerror + + +@retry_on_valueerror() +def test_asyncio_recursive_on_cpu_coros(): + result, data = run_target("target_asyncio_recursive_on_cpu_coros", "-c") + assert result.returncode == 0, result.stderr.decode() + + assert data is not None + md = data.metadata + assert md["mode"] == "cpu" + assert md["interval"] == "1000" + + summary = DataSummary(data) + + summary_json = {} + for thread in summary.threads: + summary_json[thread] = [ + { + "stack": key, + "metric": value, + } + for key, value in summary.threads[thread].items() + if key and isinstance(next(iter(key)), str) + ] + + dump_summary(summary, "summary_asyncio_recursive_on_cpu_coros.json") + + if PY >= (3, 11): + summary.assert_stack( + "0:MainThread", + ( + "_run_module_as_main", + "_run_code", + "", + "main_sync", + "run", + "Runner.run", + "BaseEventLoop.run_until_complete", + "BaseEventLoop.run_forever", + "BaseEventLoop._run_once", + "Handle._run", + "Task-1", + "async_main", + "outer", + "inner1", + "inner2", + "inner3", + "sync_code_outer", + "sync_code", + ), + lambda v: v >= 0.9 * 1e6, + ) + else: + summary.assert_stack( + "0:MainThread", + ( + "_run_module_as_main", + "_run_code", + "", + "main_sync", + "run", + "run_until_complete", + "run_forever", + "_run_once", + "_run", + "Task-1", + "async_main", + "outer", + "inner1", + "inner2", + "inner3", + "sync_code_outer", + "sync_code", + ), + lambda v: v >= 0.9 * 1e6, + ) diff --git a/tests/test_asyncio_recursive_on_cpu_tasks.py b/tests/test_asyncio_recursive_on_cpu_tasks.py new file mode 100644 index 00000000..06f7728f --- /dev/null +++ b/tests/test_asyncio_recursive_on_cpu_tasks.py @@ -0,0 +1,75 @@ +import json + +from tests.utils import PY, DataSummary, summary_to_json +from tests.utils import dump_summary, run_target, retry_on_valueerror + + +@retry_on_valueerror() +def test_asyncio_recursive_on_cpu_tasks(): + result, data = run_target("target_asyncio_recursive_on_cpu_tasks", "-c") + assert result.returncode == 0, result.stderr.decode() + + assert data is not None + md = data.metadata + assert md["mode"] == "cpu" + assert md["interval"] == "1000" + + summary = DataSummary(data) + dump_summary(summary, "summary_asyncio_recursive_on_cpu_tasks.json") + + try: + if PY >= (3, 11): + summary.assert_stack( + "0:MainThread", + ( + "_run_module_as_main", + "_run_code", + "", + "main_sync", + "run", + "Runner.run", + "BaseEventLoop.run_until_complete", + "BaseEventLoop.run_forever", + "BaseEventLoop._run_once", + "Handle._run", + "Task-1", + "async_main", + "outer", + "inner1", + "Task-2", + "inner2", + "inner3", + "sync_code_outer", + "sync_code", + ), + lambda v: v >= 0.9 * 1e6, + ) + else: + summary.assert_stack( + "0:MainThread", + ( + "_run_module_as_main", + "_run_code", + "", + "main_sync", + "run", + "run_until_complete", + "run_forever", + "_run_once", + "_run", + "Task-1", + "async_main", + "outer", + "inner1", + "Task-2", + "inner2", + "inner3", + "sync_code_outer", + "sync_code", + ), + lambda v: v >= 0.9 * 1e6, + ) + except AssertionError: + print("stderr", result.stderr.decode()) + print(json.dumps(summary_to_json(summary), indent=2)) + raise diff --git a/tests/test_asyncio_wait.py b/tests/test_asyncio_wait.py index 8db3d82d..caeefac1 100644 --- a/tests/test_asyncio_wait.py +++ b/tests/test_asyncio_wait.py @@ -1,3 +1,5 @@ +import json + from tests.utils import DataSummary, run_target, retry_on_valueerror @@ -24,19 +26,23 @@ def test_asyncio_wait(): if key and isinstance(next(iter(key)), str) ] - # Test that we see the stitched stacks (Task-1 / outer / wait / _wait / inner- / inner / sleep) - for i in range(10): - summary.assert_substack( - "0:MainThread", - ( - "Task-1", - "main", - "outer", - "wait", - "_wait", - f"inner-{i}", - "inner", - "sleep", - ), - lambda v: v >= 0.0, - ) + try: + # Test that we see the stitched stacks (Task-1 / outer / wait / _wait / inner- / inner / sleep) + for i in range(10): + summary.assert_substack( + "0:MainThread", + ( + "Task-1", + "main", + "outer", + "wait", + "_wait", + f"inner-{i}", + "inner", + "sleep", + ), + lambda v: v >= 0.0, + ) + except AssertionError: + print(json.dumps(summary_json, indent=2)) + raise diff --git a/tests/test_asyncio_within_function.py b/tests/test_asyncio_within_function.py new file mode 100644 index 00000000..74ecd9e3 --- /dev/null +++ b/tests/test_asyncio_within_function.py @@ -0,0 +1,164 @@ +import json +import sys + +from tests.utils import PY +from tests.utils import DataSummary +from tests.utils import dump_summary +from tests.utils import run_target +from tests.utils import retry_on_valueerror + + +@retry_on_valueerror() +def test_asyncio_within_function(): + result, data = run_target("target_asyncio_within_function") + assert result.returncode == 0, result.stderr.decode() + + assert data is not None + md = data.metadata + assert md["mode"] == "wall" + assert md["interval"] == "1000" + + summary = DataSummary(data) + + summary_json = {} + for thread in summary.threads: + summary_json[thread] = sorted( + [ + { + "stack": key, + "metric": value, + } + for key, value in summary.threads[thread].items() + if key and isinstance(next(iter(key)), str) + ], + key=lambda x: x["metric"], + reverse=True, + ) + + dump_summary(summary, "summary_asyncio_within_function.json") + + # We expect MainThread and the sampler + expected_nthreads = 2 + assert summary.nthreads == expected_nthreads, summary.threads + assert summary.total_metric >= 1.4 * 1e6 + + try: + # sync_main / async_starter / (Task-1) / async_main / outer / inner / synchronous_code / synchronous_code_dep (/ time.sleep) + if PY >= (3, 11): + summary.assert_substack( + "0:MainThread", + ( + "_run_module_as_main", + "_run_code", + "", + "sync_main", + "async_starter", + "run", + "Runner.run", + "BaseEventLoop.run_until_complete", + "BaseEventLoop.run_forever", + "BaseEventLoop._run_once", + "Handle._run", + "Task-1", + "async_main", + "outer", + "inner", + "synchronous_code", + "synchronous_code_dep" + # We don't have time.sleep because it's a C function. + ), + lambda v: v >= (0.25 * 0.9) * 1e6, + ) + + # sync_main / async_starter / (Task-1) / async_main / outer / asyncio.sleep + summary.assert_substack( + "0:MainThread", + ( + "", + "sync_main", + "async_starter", + "run", + "Runner.run", + "BaseEventLoop.run_until_complete", + "BaseEventLoop.run_forever", + "BaseEventLoop._run_once", + f'{"KqueueSelector" if sys.platform == "darwin" else "EpollSelector"}.select', + "Task-1", + "async_main", + "outer", + "sleep", # asyncio.sleep + ), + lambda v: v >= (0.25 * 0.9) * 1e6, + ) + + # sync_main / async_starter / (Task-1) / async_main / outer / asyncio.sleep + summary.assert_stack( + "0:MainThread", + ( + "_run_module_as_main", + "_run_code", + "", + "sync_main", + ), + lambda v: v >= (0.25 * 0.9) * 1e6, + ) + else: + + summary.assert_substack( + "0:MainThread", + ( + "_run_module_as_main", + "_run_code", + "", + "sync_main", + "async_starter", + "run", + "run_until_complete", + "run_forever", + "_run_once", + "_run", + "Task-1", + "async_main", + "outer", + "inner", + "synchronous_code", + "synchronous_code_dep" + # We don't have time.sleep because it's a C function. + ), + lambda v: v >= (0.25 * 0.9) * 1e6, + ) + + # sync_main / async_starter / (Task-1) / async_main / outer / asyncio.sleep + summary.assert_substack( + "0:MainThread", + ( + "", + "sync_main", + "async_starter", + "run", + "run_until_complete", + "run_forever", + "_run_once", + "select", + "Task-1", + "async_main", + "outer", + "sleep", # asyncio.sleep + ), + lambda v: v >= (0.25 * 0.9) * 1e6, + ) + + # sync_main / async_starter / (Task-1) / async_main / outer / asyncio.sleep + summary.assert_stack( + "0:MainThread", + ( + "_run_module_as_main", + "_run_code", + "", + "sync_main", + ), + lambda v: v >= (0.25 * 0.9) * 1e6, + ) + except AssertionError: + print(json.dumps(summary_json, indent=4)) + raise diff --git a/tests/test_gevent.py b/tests/test_gevent.py index 529ec40b..90730e42 100644 --- a/tests/test_gevent.py +++ b/tests/test_gevent.py @@ -2,9 +2,17 @@ from itertools import count from types import FunctionType +import pytest + from tests.utils import PY, DataSummary, run_target, retry_on_valueerror +try: + import gevent # noqa: F401 +except ImportError: + pytest.skip("gevent not installed", allow_module_level=True) + + def get_line_number(function: FunctionType, content: str) -> int: code = function.__code__ start = code.co_firstlineno diff --git a/tests/test_wall_data.py b/tests/test_wall_data.py index 6cc96396..4aa69645 100644 --- a/tests/test_wall_data.py +++ b/tests/test_wall_data.py @@ -7,6 +7,7 @@ from tests.utils import retry_on_valueerror +@pytest.mark.skip() @retry_on_valueerror() @stealth def test_wall_time(stealth):