Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions ddtrace/internal/datadog/profiling/stack/echion/echion/tasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ class TaskInfo
}

[[nodiscard]] static Result<TaskInfo::Ptr> current(PyObject*);
inline size_t unwind(FrameStack&, size_t& upper_python_stack_size);
inline size_t unwind(FrameStack&);
};

inline std::unordered_map<PyObject*, PyObject*> task_link_map;
Expand Down Expand Up @@ -300,10 +300,12 @@ inline std::vector<std::unique_ptr<StackInfo>> current_tasks;
// ----------------------------------------------------------------------------

inline size_t
TaskInfo::unwind(FrameStack& stack, size_t& upper_python_stack_size)
TaskInfo::unwind(FrameStack& stack)
{
// TODO: Check for running task.
std::stack<PyObject*> coro_frames;

// Use a vector-based std::stack as we only push_back/pop_back
std::stack<PyObject*, std::vector<PyObject*>> coro_frames;

// Unwind the coro chain
for (auto py_coro = this->coro.get(); py_coro != NULL; py_coro = py_coro->await.get()) {
Expand Down Expand Up @@ -331,7 +333,7 @@ TaskInfo::unwind(FrameStack& stack, size_t& upper_python_stack_size)
// use the number of Frames added to the Stack to determine the size of the upper Python stack.
if (count == 0) {
// The first Frame is the coroutine Frame, so the Python stack size is the number of Frames - 1
upper_python_stack_size = new_frames - 1;
auto upper_python_stack_size = new_frames - 1;

// Remove the Python Frames from the Stack (they will be added back later)
// We cannot push those Frames now because otherwise they would be added once per Task,
Expand Down
63 changes: 52 additions & 11 deletions ddtrace/internal/datadog/profiling/stack/echion/echion/threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,52 @@ ThreadInfo::unwind(PyThreadState* tstate)
inline Result<void>
ThreadInfo::unwind_tasks(PyThreadState* tstate)
{
// The size of the "pure Python" stack (before asyncio Frames), computed later by walking the Python Stack
size_t upper_python_stack_size = 0;

// Check if the Python stack contains "_run".
// To avoid having to do string comparisons every time we unwind Tasks, we keep track
// of the cache key of the "_run" Frame.
static std::optional<Frame::Key> frame_cache_key;
if (!frame_cache_key) {
for (size_t i = 0; i < python_stack.size(); i++) {
const auto& frame = python_stack[i].get();
const auto& frame_name = string_table.lookup(frame.name)->get();

#if PY_VERSION_HEX >= 0x030b0000
// After Python 3.11, function names in Frames are qualified with e.g. the class name, so we
// can use the qualified name to identify the "_run" Frame.
constexpr std::string_view _run = "Handle._run";
auto is_run_frame = frame_name == _run;
#else
// Before Python 3.11, function names in Frames are not qualified, so we
// can use the filename to identify the "_run" Frame.
constexpr std::string_view asyncio_runners_py = "asyncio/events.py";
constexpr std::string_view _run = "_run";
auto filename = string_table.lookup(frame.filename)->get();
auto is_asyncio = filename.rfind(asyncio_runners_py) == filename.size() - asyncio_runners_py.size();
auto is_run_frame = is_asyncio && (frame_name.rfind(_run) == frame_name.size() - _run.size());
#endif
if (is_run_frame) {
// Although Frames are stored in an LRUCache, the cache key is ALWAYS the same
// even if the Frame gets evicted from the cache.
// This means we can keep the cache key and re-use it to determine
// whether we see the "_run" Frame in the Python stack.
frame_cache_key = frame.cache_key;
upper_python_stack_size = python_stack.size() - i;
break;
}
}
} else {
for (size_t i = 0; i < python_stack.size(); i++) {
const auto& frame = python_stack[i].get();
if (frame.cache_key == *frame_cache_key) {
upper_python_stack_size = python_stack.size() - i;
break;
}
}
}

std::vector<TaskInfo::Ref> leaf_tasks;
std::unordered_set<PyObject*> parent_tasks;
std::unordered_map<PyObject*, TaskInfo::Ref> waitee_map; // Indexed by task origin
Expand Down Expand Up @@ -288,30 +334,25 @@ ThreadInfo::unwind_tasks(PyThreadState* tstate)
}
}

// The size of the "pure Python" stack (before asyncio Frames), computed later by TaskInfo::unwind
size_t upper_python_stack_size = 0;
// Unused variable, will be used later by TaskInfo::unwind
size_t unused;

bool on_cpu_task_seen = false;
for (auto& leaf_task : leaf_tasks) {
on_cpu_task_seen = on_cpu_task_seen || leaf_task.get().is_on_cpu;

auto stack_info = std::make_unique<StackInfo>(leaf_task.get().name, leaf_task.get().is_on_cpu);
auto& stack = stack_info->stack;

for (auto current_task = leaf_task;;) {
auto& task = current_task.get();

// The task_stack_size includes both the coroutines frames and the "upper" Python synchronous frames
size_t task_stack_size = task.unwind(stack, task.is_on_cpu ? upper_python_stack_size : unused);
auto task_stack_size = task.unwind(stack);
if (task.is_on_cpu) {
// Get the "bottom" part of the Python synchronous Stack, that is to say the
// synchronous functions and coroutines called by the Task's outermost coroutine
// The number of Frames to push is the total number of Frames in the Python stack, from which we
// subtract the number of Frames in the "upper Python stack" (asyncio machinery + sync entrypoint)
// This gives us [outermost coroutine, ... , innermost coroutine, outermost sync function, ... ,
// innermost sync function]
// TODO: This may be incorrect if the Task that we know is on CPU does not match the Task that
// actually was on CPU when the Python Thread Stack was captured. One way to work around this
// may be to look at every Task Stack and match it against the Thread Stack. This would be
// somewhat costly though, and so far I have not seen a single instance of this race condition.
size_t frames_to_push =
(python_stack.size() > task_stack_size) ? python_stack.size() - task_stack_size : 0;
for (size_t i = 0; i < frames_to_push; i++) {
Expand Down Expand Up @@ -353,7 +394,7 @@ ThreadInfo::unwind_tasks(PyThreadState* tstate)
// one we saw in TaskInfo::unwind. This is extremely unlikely, I believe, but failing to account for it would
// cause an underflow, so let's be conservative.
size_t start_index = 0;
if (on_cpu_task_seen && python_stack.size() >= upper_python_stack_size) {
if (python_stack.size() >= upper_python_stack_size) {
start_index = python_stack.size() - upper_python_stack_size;
}
for (size_t i = start_index; i < python_stack.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
fixes:
- |
profiling: This improves the accuracy of stacks for on-CPU asyncio Tasks by reducing the odds of Frames for a Task
polluting the stack of other Tasks.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import pytest


@pytest.mark.xfail(reason="This test is flaky due to a race condition, see PROF-13137")
@pytest.mark.subprocess(
env=dict(
DD_PROFILING_OUTPUT_PPROF="/tmp/test_asyncio_recursive_on_cpu_tasks",
Expand Down Expand Up @@ -132,11 +131,13 @@ def loc(f_name: str) -> StackLocation:
loc("<module>"),
loc("main_sync"),
loc("run"),
loc("Runner.run"),
loc("BaseEventLoop.run_until_complete"),
loc("BaseEventLoop.run_forever"),
loc("BaseEventLoop._run_once"),
loc("Handle._run"),
]
+ ([loc(f"{runner_prefix}run")] if PYVERSION >= (3, 11) else [])
+ [
loc(f"{base_event_loop_prefix}run_until_complete"),
loc(f"{base_event_loop_prefix}run_forever"),
loc(f"{base_event_loop_prefix}_run_once"),
loc(f"{handle_prefix}_run"),
# loc("Task-1"),
loc("async_main"),
loc("outer"),
Expand Down
Loading