diff --git a/ddtrace/internal/datadog/profiling/stack/echion/echion/tasks.h b/ddtrace/internal/datadog/profiling/stack/echion/echion/tasks.h index 4c71e67f5cf..43da67012bb 100644 --- a/ddtrace/internal/datadog/profiling/stack/echion/echion/tasks.h +++ b/ddtrace/internal/datadog/profiling/stack/echion/echion/tasks.h @@ -219,7 +219,7 @@ class TaskInfo } [[nodiscard]] static Result current(PyObject*); - inline size_t unwind(FrameStack&, size_t& upper_python_stack_size); + inline size_t unwind(FrameStack&); }; inline std::unordered_map task_link_map; @@ -300,10 +300,12 @@ inline std::vector> current_tasks; // ---------------------------------------------------------------------------- inline size_t -TaskInfo::unwind(FrameStack& stack, size_t& upper_python_stack_size) +TaskInfo::unwind(FrameStack& stack) { // TODO: Check for running task. - std::stack coro_frames; + + // Use a vector-based std::stack as we only push_back/pop_back + std::stack> coro_frames; // Unwind the coro chain for (auto py_coro = this->coro.get(); py_coro != NULL; py_coro = py_coro->await.get()) { @@ -331,7 +333,7 @@ TaskInfo::unwind(FrameStack& stack, size_t& upper_python_stack_size) // use the number of Frames added to the Stack to determine the size of the upper Python stack. if (count == 0) { // The first Frame is the coroutine Frame, so the Python stack size is the number of Frames - 1 - upper_python_stack_size = new_frames - 1; + auto upper_python_stack_size = new_frames - 1; // Remove the Python Frames from the Stack (they will be added back later) // We cannot push those Frames now because otherwise they would be added once per Task, diff --git a/ddtrace/internal/datadog/profiling/stack/echion/echion/threads.h b/ddtrace/internal/datadog/profiling/stack/echion/echion/threads.h index 47d045d5562..2ade1dde847 100644 --- a/ddtrace/internal/datadog/profiling/stack/echion/echion/threads.h +++ b/ddtrace/internal/datadog/profiling/stack/echion/echion/threads.h @@ -219,6 +219,52 @@ ThreadInfo::unwind(PyThreadState* tstate) inline Result ThreadInfo::unwind_tasks(PyThreadState* tstate) { + // The size of the "pure Python" stack (before asyncio Frames), computed later by walking the Python Stack + size_t upper_python_stack_size = 0; + + // Check if the Python stack contains "_run". + // To avoid having to do string comparisons every time we unwind Tasks, we keep track + // of the cache key of the "_run" Frame. + static std::optional frame_cache_key; + if (!frame_cache_key) { + for (size_t i = 0; i < python_stack.size(); i++) { + const auto& frame = python_stack[i].get(); + const auto& frame_name = string_table.lookup(frame.name)->get(); + +#if PY_VERSION_HEX >= 0x030b0000 + // After Python 3.11, function names in Frames are qualified with e.g. the class name, so we + // can use the qualified name to identify the "_run" Frame. + constexpr std::string_view _run = "Handle._run"; + auto is_run_frame = frame_name == _run; +#else + // Before Python 3.11, function names in Frames are not qualified, so we + // can use the filename to identify the "_run" Frame. + constexpr std::string_view asyncio_runners_py = "asyncio/events.py"; + constexpr std::string_view _run = "_run"; + auto filename = string_table.lookup(frame.filename)->get(); + auto is_asyncio = filename.rfind(asyncio_runners_py) == filename.size() - asyncio_runners_py.size(); + auto is_run_frame = is_asyncio && (frame_name.rfind(_run) == frame_name.size() - _run.size()); +#endif + if (is_run_frame) { + // Although Frames are stored in an LRUCache, the cache key is ALWAYS the same + // even if the Frame gets evicted from the cache. + // This means we can keep the cache key and re-use it to determine + // whether we see the "_run" Frame in the Python stack. + frame_cache_key = frame.cache_key; + upper_python_stack_size = python_stack.size() - i; + break; + } + } + } else { + for (size_t i = 0; i < python_stack.size(); i++) { + const auto& frame = python_stack[i].get(); + if (frame.cache_key == *frame_cache_key) { + upper_python_stack_size = python_stack.size() - i; + break; + } + } + } + std::vector leaf_tasks; std::unordered_set parent_tasks; std::unordered_map waitee_map; // Indexed by task origin @@ -288,23 +334,14 @@ ThreadInfo::unwind_tasks(PyThreadState* tstate) } } - // The size of the "pure Python" stack (before asyncio Frames), computed later by TaskInfo::unwind - size_t upper_python_stack_size = 0; - // Unused variable, will be used later by TaskInfo::unwind - size_t unused; - - bool on_cpu_task_seen = false; for (auto& leaf_task : leaf_tasks) { - on_cpu_task_seen = on_cpu_task_seen || leaf_task.get().is_on_cpu; - auto stack_info = std::make_unique(leaf_task.get().name, leaf_task.get().is_on_cpu); auto& stack = stack_info->stack; for (auto current_task = leaf_task;;) { auto& task = current_task.get(); - // The task_stack_size includes both the coroutines frames and the "upper" Python synchronous frames - size_t task_stack_size = task.unwind(stack, task.is_on_cpu ? upper_python_stack_size : unused); + auto task_stack_size = task.unwind(stack); if (task.is_on_cpu) { // Get the "bottom" part of the Python synchronous Stack, that is to say the // synchronous functions and coroutines called by the Task's outermost coroutine @@ -312,6 +349,10 @@ ThreadInfo::unwind_tasks(PyThreadState* tstate) // subtract the number of Frames in the "upper Python stack" (asyncio machinery + sync entrypoint) // This gives us [outermost coroutine, ... , innermost coroutine, outermost sync function, ... , // innermost sync function] + // TODO: This may be incorrect if the Task that we know is on CPU does not match the Task that + // actually was on CPU when the Python Thread Stack was captured. One way to work around this + // may be to look at every Task Stack and match it against the Thread Stack. This would be + // somewhat costly though, and so far I have not seen a single instance of this race condition. size_t frames_to_push = (python_stack.size() > task_stack_size) ? python_stack.size() - task_stack_size : 0; for (size_t i = 0; i < frames_to_push; i++) { @@ -353,7 +394,7 @@ ThreadInfo::unwind_tasks(PyThreadState* tstate) // one we saw in TaskInfo::unwind. This is extremely unlikely, I believe, but failing to account for it would // cause an underflow, so let's be conservative. size_t start_index = 0; - if (on_cpu_task_seen && python_stack.size() >= upper_python_stack_size) { + if (python_stack.size() >= upper_python_stack_size) { start_index = python_stack.size() - upper_python_stack_size; } for (size_t i = start_index; i < python_stack.size(); i++) { diff --git a/releasenotes/notes/profiling-improve-on-cpu-stacks-f3047328da22fce1.yaml b/releasenotes/notes/profiling-improve-on-cpu-stacks-f3047328da22fce1.yaml new file mode 100644 index 00000000000..fac78ccd8f8 --- /dev/null +++ b/releasenotes/notes/profiling-improve-on-cpu-stacks-f3047328da22fce1.yaml @@ -0,0 +1,4 @@ +fixes: + - | + profiling: This improves the accuracy of stacks for on-CPU asyncio Tasks by reducing the odds of Frames for a Task + polluting the stack of other Tasks. diff --git a/tests/profiling/collector/test_asyncio_recursive_on_cpu_tasks.py b/tests/profiling/collector/test_asyncio_recursive_on_cpu_tasks.py index d0bca2aee6b..1d67fc26454 100644 --- a/tests/profiling/collector/test_asyncio_recursive_on_cpu_tasks.py +++ b/tests/profiling/collector/test_asyncio_recursive_on_cpu_tasks.py @@ -1,7 +1,6 @@ import pytest -@pytest.mark.xfail(reason="This test is flaky due to a race condition, see PROF-13137") @pytest.mark.subprocess( env=dict( DD_PROFILING_OUTPUT_PPROF="/tmp/test_asyncio_recursive_on_cpu_tasks", @@ -132,11 +131,13 @@ def loc(f_name: str) -> StackLocation: loc(""), loc("main_sync"), loc("run"), - loc("Runner.run"), - loc("BaseEventLoop.run_until_complete"), - loc("BaseEventLoop.run_forever"), - loc("BaseEventLoop._run_once"), - loc("Handle._run"), + ] + + ([loc(f"{runner_prefix}run")] if PYVERSION >= (3, 11) else []) + + [ + loc(f"{base_event_loop_prefix}run_until_complete"), + loc(f"{base_event_loop_prefix}run_forever"), + loc(f"{base_event_loop_prefix}_run_once"), + loc(f"{handle_prefix}_run"), # loc("Task-1"), loc("async_main"), loc("outer"),