diff --git a/ddtrace/internal/datadog/profiling/stack/echion/echion/tasks.h b/ddtrace/internal/datadog/profiling/stack/echion/echion/tasks.h index 25636f7ad69..9e77dc26ce0 100644 --- a/ddtrace/internal/datadog/profiling/stack/echion/echion/tasks.h +++ b/ddtrace/internal/datadog/profiling/stack/echion/echion/tasks.h @@ -30,6 +30,7 @@ #include #include #include +#include #include #include @@ -219,7 +220,7 @@ class TaskInfo } [[nodiscard]] static Result current(PyObject*); - inline size_t unwind(FrameStack&, size_t& upper_python_stack_size); + inline Result unwind(FrameStack&); }; inline std::unordered_map task_link_map; @@ -299,11 +300,11 @@ inline std::vector> current_tasks; // ---------------------------------------------------------------------------- -inline size_t -TaskInfo::unwind(FrameStack& stack, size_t& upper_python_stack_size) +inline Result +TaskInfo::unwind(FrameStack& stack) { // TODO: Check for running task. - std::stack coro_frames; + std::stack> coro_frames; // Unwind the coro chain // Detect cycles in the await chain to prevent infinite loops. @@ -342,7 +343,15 @@ TaskInfo::unwind(FrameStack& stack, size_t& upper_python_stack_size) // use the number of Frames added to the Stack to determine the size of the upper Python stack. if (count == 0) { // The first Frame is the coroutine Frame, so the Python stack size is the number of Frames - 1 - upper_python_stack_size = new_frames - 1; + size_t upper_python_stack_size = new_frames - 1; + + // If the Task is on CPU, then we should have at least the asyncio runtime Frames on top of + // the asynchronous Stack. If we do not have any Python Frames, then the execution state changed + // (race condition) and we cannot recover (for on-CPU Tasks, we want to see the full execution + // Stack, which we won't if we have a non-running Python Stack). + if (this->is_on_cpu && upper_python_stack_size == 0) { + return ErrorKind::TaskInfoError; + } // Remove the Python Frames from the Stack (they will be added back later) // We cannot push those Frames now because otherwise they would be added once per Task, diff --git a/ddtrace/internal/datadog/profiling/stack/echion/echion/threads.h b/ddtrace/internal/datadog/profiling/stack/echion/echion/threads.h index 8f09dd3dcc5..c6a591c89d0 100644 --- a/ddtrace/internal/datadog/profiling/stack/echion/echion/threads.h +++ b/ddtrace/internal/datadog/profiling/stack/echion/echion/threads.h @@ -12,11 +12,13 @@ #endif #include +#include #include #include #include #include #include +#include #if defined PL_LINUX #include @@ -219,6 +221,57 @@ ThreadInfo::unwind(PyThreadState* tstate) inline Result ThreadInfo::unwind_tasks(PyThreadState* tstate) { + // The size of the "pure Python" stack (before asyncio Frames), computed later by walking the Python Stack + size_t upper_python_stack_size = 0; + + // Check if the Python stack contains "_run". + // To avoid having to do string comparisons every time we unwind Tasks, we keep track + // of the cache key of the "_run" Frame. + static std::optional frame_cache_key; + bool expect_at_least_one_running_task = false; + if (!frame_cache_key) { + for (size_t i = 0; i < python_stack.size(); i++) { + const auto& frame = python_stack[i].get(); + const auto& frame_name = string_table.lookup(frame.name)->get(); + +#if PY_VERSION_HEX >= 0x030b0000 + // After Python 3.11, function names in Frames are qualified with e.g. the class name, so we + // can use the qualified name to identify the "_run" Frame. + constexpr std::string_view _run = "Handle._run"; + auto is_run_frame = frame_name.size() >= _run.size() && frame_name == _run; +#else + // Before Python 3.11, function names in Frames are not qualified, so we + // can use the filename to identify the "_run" Frame. + constexpr std::string_view asyncio_runners_py = "asyncio/events.py"; + constexpr std::string_view _run = "_run"; + auto filename = string_table.lookup(frame.filename)->get(); + auto is_asyncio = filename.size() >= asyncio_runners_py.size() && + filename.rfind(asyncio_runners_py) == filename.size() - asyncio_runners_py.size(); + auto is_run_frame = is_asyncio && (frame_name.size() >= _run.size() && + frame_name.rfind(_run) == frame_name.size() - _run.size()); +#endif + if (is_run_frame) { + // Although Frames are stored in an LRUCache, the cache key is ALWAYS the same + // even if the Frame gets evicted from the cache. + // This means we can keep the cache key and re-use it to determine + // whether we see the "_run" Frame in the Python stack. + frame_cache_key = frame.cache_key; + expect_at_least_one_running_task = true; + upper_python_stack_size = python_stack.size() - i; + break; + } + } + } else { + for (size_t i = 0; i < python_stack.size(); i++) { + const auto& frame = python_stack[i].get(); + if (frame.cache_key == *frame_cache_key) { + expect_at_least_one_running_task = true; + upper_python_stack_size = python_stack.size() - i; + break; + } + } + } + std::vector leaf_tasks; std::unordered_set parent_tasks; std::unordered_map waitee_map; // Indexed by task origin @@ -231,6 +284,27 @@ ThreadInfo::unwind_tasks(PyThreadState* tstate) } auto all_tasks = std::move(*maybe_all_tasks); + + // Guard against the critical case where a Task is marked as running (on-CPU) but the Python Stack + // doesn't show the asyncio runtime frames. This would cause incorrect frame popping. + // The opposite case (Stack shows runtime frames but no task is marked as running) is less critical + // and can happen when the frame detection doesn't match the specific asyncio implementation, + // especially in Python 3.10 where we look for "_run" in "asyncio/events.py" but the actual + // stack may show "_run_once" instead. + bool at_least_one_running_task_seen = false; + for (const auto& task_ref : all_tasks) { + const auto& task = task_ref.get(); + if (task->is_on_cpu) { + at_least_one_running_task_seen = true; + break; + } + } + + // Only error if a task is running but the stack doesn't show it (critical case) + if (at_least_one_running_task_seen && !expect_at_least_one_running_task) { + return ErrorKind::TaskInfoError; + } + { std::lock_guard lock(task_link_map_lock); @@ -288,15 +362,7 @@ ThreadInfo::unwind_tasks(PyThreadState* tstate) } } - // The size of the "pure Python" stack (before asyncio Frames), computed later by TaskInfo::unwind - size_t upper_python_stack_size = 0; - // Unused variable, will be used later by TaskInfo::unwind - size_t unused; - - bool on_cpu_task_seen = false; for (auto& leaf_task : leaf_tasks) { - on_cpu_task_seen = on_cpu_task_seen || leaf_task.get().is_on_cpu; - auto stack_info = std::make_unique(leaf_task.get().name, leaf_task.get().is_on_cpu); auto& stack = stack_info->stack; @@ -310,8 +376,14 @@ ThreadInfo::unwind_tasks(PyThreadState* tstate) } seen_task_origins.insert(task.origin); + auto maybe_task_stack_size = task.unwind(stack); + if (!maybe_task_stack_size) { + // Skip the current Task Stack (Leaf Task to top) + break; + } + // The task_stack_size includes both the coroutines frames and the "upper" Python synchronous frames - auto task_stack_size = task.unwind(stack, task.is_on_cpu ? upper_python_stack_size : unused); + auto task_stack_size = *maybe_task_stack_size; if (task.is_on_cpu) { // Get the "bottom" part of the Python synchronous Stack, that is to say the // synchronous functions and coroutines called by the Task's outermost coroutine @@ -360,7 +432,7 @@ ThreadInfo::unwind_tasks(PyThreadState* tstate) // one we saw in TaskInfo::unwind. This is extremely unlikely, I believe, but failing to account for it would // cause an underflow, so let's be conservative. size_t start_index = 0; - if (on_cpu_task_seen && python_stack.size() >= upper_python_stack_size) { + if (expect_at_least_one_running_task && python_stack.size() >= upper_python_stack_size) { start_index = python_stack.size() - upper_python_stack_size; } for (size_t i = start_index; i < python_stack.size(); i++) { diff --git a/releasenotes/notes/profiling-on-cpu-task-race-condition-workaround-93a01ac9f8d4983c.yaml b/releasenotes/notes/profiling-on-cpu-task-race-condition-workaround-93a01ac9f8d4983c.yaml new file mode 100644 index 00000000000..080b5ae65de --- /dev/null +++ b/releasenotes/notes/profiling-on-cpu-task-race-condition-workaround-93a01ac9f8d4983c.yaml @@ -0,0 +1,4 @@ +fixes: + - | + profiling: This fix works around a race condition leading to incorrect stacks sometimes being reported when + profiling on-CPU asyncio Tasks. diff --git a/tests/profiling/collector/test_asyncio_recursive_on_cpu_tasks.py b/tests/profiling/collector/test_asyncio_recursive_on_cpu_tasks.py index d0bca2aee6b..1d67fc26454 100644 --- a/tests/profiling/collector/test_asyncio_recursive_on_cpu_tasks.py +++ b/tests/profiling/collector/test_asyncio_recursive_on_cpu_tasks.py @@ -1,7 +1,6 @@ import pytest -@pytest.mark.xfail(reason="This test is flaky due to a race condition, see PROF-13137") @pytest.mark.subprocess( env=dict( DD_PROFILING_OUTPUT_PPROF="/tmp/test_asyncio_recursive_on_cpu_tasks", @@ -132,11 +131,13 @@ def loc(f_name: str) -> StackLocation: loc(""), loc("main_sync"), loc("run"), - loc("Runner.run"), - loc("BaseEventLoop.run_until_complete"), - loc("BaseEventLoop.run_forever"), - loc("BaseEventLoop._run_once"), - loc("Handle._run"), + ] + + ([loc(f"{runner_prefix}run")] if PYVERSION >= (3, 11) else []) + + [ + loc(f"{base_event_loop_prefix}run_until_complete"), + loc(f"{base_event_loop_prefix}run_forever"), + loc(f"{base_event_loop_prefix}_run_once"), + loc(f"{handle_prefix}_run"), # loc("Task-1"), loc("async_main"), loc("outer"),