Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions ddtrace/internal/datadog/profiling/stack/echion/echion/tasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <mutex>
#include <stack>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include <echion/config.h>
Expand Down Expand Up @@ -219,7 +220,7 @@ class TaskInfo
}

[[nodiscard]] static Result<TaskInfo::Ptr> current(PyObject*);
inline size_t unwind(FrameStack&, size_t& upper_python_stack_size);
inline Result<size_t> unwind(FrameStack&);
};

inline std::unordered_map<PyObject*, PyObject*> task_link_map;
Expand Down Expand Up @@ -299,11 +300,11 @@ inline std::vector<std::unique_ptr<StackInfo>> current_tasks;

// ----------------------------------------------------------------------------

inline size_t
TaskInfo::unwind(FrameStack& stack, size_t& upper_python_stack_size)
inline Result<size_t>
TaskInfo::unwind(FrameStack& stack)
{
// TODO: Check for running task.
std::stack<PyObject*> coro_frames;
std::stack<PyObject*, std::vector<PyObject*>> coro_frames;

// Unwind the coro chain
// Detect cycles in the await chain to prevent infinite loops.
Expand Down Expand Up @@ -342,7 +343,15 @@ TaskInfo::unwind(FrameStack& stack, size_t& upper_python_stack_size)
// use the number of Frames added to the Stack to determine the size of the upper Python stack.
if (count == 0) {
// The first Frame is the coroutine Frame, so the Python stack size is the number of Frames - 1
upper_python_stack_size = new_frames - 1;
size_t upper_python_stack_size = new_frames - 1;

// If the Task is on CPU, then we should have at least the asyncio runtime Frames on top of
// the asynchronous Stack. If we do not have any Python Frames, then the execution state changed
// (race condition) and we cannot recover (for on-CPU Tasks, we want to see the full execution
// Stack, which we won't if we have a non-running Python Stack).
if (this->is_on_cpu && upper_python_stack_size == 0) {
return ErrorKind::TaskInfoError;
}

// Remove the Python Frames from the Stack (they will be added back later)
// We cannot push those Frames now because otherwise they would be added once per Task,
Expand Down
92 changes: 82 additions & 10 deletions ddtrace/internal/datadog/profiling/stack/echion/echion/threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
#endif

#include <algorithm>
#include <chrono>
#include <cstdint>
#include <functional>
#include <mutex>
#include <optional>
#include <unordered_map>
#include <unordered_set>

#if defined PL_LINUX
#include <time.h>
Expand Down Expand Up @@ -219,6 +221,57 @@ ThreadInfo::unwind(PyThreadState* tstate)
inline Result<void>
ThreadInfo::unwind_tasks(PyThreadState* tstate)
{
// The size of the "pure Python" stack (before asyncio Frames), computed later by walking the Python Stack
size_t upper_python_stack_size = 0;

// Check if the Python stack contains "_run".
// To avoid having to do string comparisons every time we unwind Tasks, we keep track
// of the cache key of the "_run" Frame.
static std::optional<Frame::Key> frame_cache_key;
bool expect_at_least_one_running_task = false;
if (!frame_cache_key) {
for (size_t i = 0; i < python_stack.size(); i++) {
const auto& frame = python_stack[i].get();
const auto& frame_name = string_table.lookup(frame.name)->get();

#if PY_VERSION_HEX >= 0x030b0000
// After Python 3.11, function names in Frames are qualified with e.g. the class name, so we
// can use the qualified name to identify the "_run" Frame.
constexpr std::string_view _run = "Handle._run";
auto is_run_frame = frame_name.size() >= _run.size() && frame_name == _run;
#else
// Before Python 3.11, function names in Frames are not qualified, so we
// can use the filename to identify the "_run" Frame.
constexpr std::string_view asyncio_runners_py = "asyncio/events.py";
constexpr std::string_view _run = "_run";
auto filename = string_table.lookup(frame.filename)->get();
auto is_asyncio = filename.size() >= asyncio_runners_py.size() &&
filename.rfind(asyncio_runners_py) == filename.size() - asyncio_runners_py.size();
auto is_run_frame = is_asyncio && (frame_name.size() >= _run.size() &&
frame_name.rfind(_run) == frame_name.size() - _run.size());
#endif
if (is_run_frame) {
// Although Frames are stored in an LRUCache, the cache key is ALWAYS the same
// even if the Frame gets evicted from the cache.
// This means we can keep the cache key and re-use it to determine
// whether we see the "_run" Frame in the Python stack.
frame_cache_key = frame.cache_key;
expect_at_least_one_running_task = true;
upper_python_stack_size = python_stack.size() - i;
break;
}
}
} else {
for (size_t i = 0; i < python_stack.size(); i++) {
const auto& frame = python_stack[i].get();
if (frame.cache_key == *frame_cache_key) {
expect_at_least_one_running_task = true;
upper_python_stack_size = python_stack.size() - i;
break;
}
}
}

std::vector<TaskInfo::Ref> leaf_tasks;
std::unordered_set<PyObject*> parent_tasks;
std::unordered_map<PyObject*, TaskInfo::Ref> waitee_map; // Indexed by task origin
Expand All @@ -231,6 +284,27 @@ ThreadInfo::unwind_tasks(PyThreadState* tstate)
}

auto all_tasks = std::move(*maybe_all_tasks);

// Guard against the critical case where a Task is marked as running (on-CPU) but the Python Stack
// doesn't show the asyncio runtime frames. This would cause incorrect frame popping.
// The opposite case (Stack shows runtime frames but no task is marked as running) is less critical
// and can happen when the frame detection doesn't match the specific asyncio implementation,
// especially in Python 3.10 where we look for "_run" in "asyncio/events.py" but the actual
// stack may show "_run_once" instead.
bool at_least_one_running_task_seen = false;
for (const auto& task_ref : all_tasks) {
const auto& task = task_ref.get();
if (task->is_on_cpu) {
at_least_one_running_task_seen = true;
break;
}
}

// Only error if a task is running but the stack doesn't show it (critical case)
if (at_least_one_running_task_seen && !expect_at_least_one_running_task) {
return ErrorKind::TaskInfoError;
}

{
std::lock_guard<std::mutex> lock(task_link_map_lock);

Expand Down Expand Up @@ -288,15 +362,7 @@ ThreadInfo::unwind_tasks(PyThreadState* tstate)
}
}

// The size of the "pure Python" stack (before asyncio Frames), computed later by TaskInfo::unwind
size_t upper_python_stack_size = 0;
// Unused variable, will be used later by TaskInfo::unwind
size_t unused;

bool on_cpu_task_seen = false;
for (auto& leaf_task : leaf_tasks) {
on_cpu_task_seen = on_cpu_task_seen || leaf_task.get().is_on_cpu;

auto stack_info = std::make_unique<StackInfo>(leaf_task.get().name, leaf_task.get().is_on_cpu);
auto& stack = stack_info->stack;

Expand All @@ -310,8 +376,14 @@ ThreadInfo::unwind_tasks(PyThreadState* tstate)
}
seen_task_origins.insert(task.origin);

auto maybe_task_stack_size = task.unwind(stack);
if (!maybe_task_stack_size) {
// Skip the current Task Stack (Leaf Task to top)
break;
}

// The task_stack_size includes both the coroutines frames and the "upper" Python synchronous frames
auto task_stack_size = task.unwind(stack, task.is_on_cpu ? upper_python_stack_size : unused);
auto task_stack_size = *maybe_task_stack_size;
if (task.is_on_cpu) {
// Get the "bottom" part of the Python synchronous Stack, that is to say the
// synchronous functions and coroutines called by the Task's outermost coroutine
Expand Down Expand Up @@ -360,7 +432,7 @@ ThreadInfo::unwind_tasks(PyThreadState* tstate)
// one we saw in TaskInfo::unwind. This is extremely unlikely, I believe, but failing to account for it would
// cause an underflow, so let's be conservative.
size_t start_index = 0;
if (on_cpu_task_seen && python_stack.size() >= upper_python_stack_size) {
if (expect_at_least_one_running_task && python_stack.size() >= upper_python_stack_size) {
start_index = python_stack.size() - upper_python_stack_size;
}
for (size_t i = start_index; i < python_stack.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
fixes:
- |
profiling: This fix works around a race condition leading to incorrect stacks sometimes being reported when
profiling on-CPU asyncio Tasks.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import pytest


@pytest.mark.xfail(reason="This test is flaky due to a race condition, see PROF-13137")
@pytest.mark.subprocess(
env=dict(
DD_PROFILING_OUTPUT_PPROF="/tmp/test_asyncio_recursive_on_cpu_tasks",
Expand Down Expand Up @@ -132,11 +131,13 @@ def loc(f_name: str) -> StackLocation:
loc("<module>"),
loc("main_sync"),
loc("run"),
loc("Runner.run"),
loc("BaseEventLoop.run_until_complete"),
loc("BaseEventLoop.run_forever"),
loc("BaseEventLoop._run_once"),
loc("Handle._run"),
]
+ ([loc(f"{runner_prefix}run")] if PYVERSION >= (3, 11) else [])
+ [
loc(f"{base_event_loop_prefix}run_until_complete"),
loc(f"{base_event_loop_prefix}run_forever"),
loc(f"{base_event_loop_prefix}_run_once"),
loc(f"{handle_prefix}_run"),
# loc("Task-1"),
loc("async_main"),
loc("outer"),
Expand Down
Loading