Skip to content

Commit 7b6ba4f

Browse files
fix(profiling): workaround for on-CPU Task race condition
1 parent 5edd9ff commit 7b6ba4f

File tree

3 files changed

+65
-3
lines changed

3 files changed

+65
-3
lines changed

ddtrace/internal/datadog/profiling/stack_v2/echion/echion/tasks.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ class TaskInfo
201201
}
202202

203203
[[nodiscard]] static Result<TaskInfo::Ptr> current(PyObject*);
204-
inline size_t unwind(FrameStack&, size_t& upper_python_stack_size);
204+
inline Result<size_t> unwind(FrameStack&, size_t& upper_python_stack_size);
205205
};
206206

207207
inline std::unordered_map<PyObject*, PyObject*> task_link_map;
@@ -342,7 +342,7 @@ inline std::vector<std::unique_ptr<StackInfo>> current_tasks;
342342

343343
// ----------------------------------------------------------------------------
344344

345-
inline size_t
345+
inline Result<size_t>
346346
TaskInfo::unwind(FrameStack& stack, size_t& upper_python_stack_size)
347347
{
348348
// TODO: Check for running task.
@@ -370,6 +370,13 @@ TaskInfo::unwind(FrameStack& stack, size_t& upper_python_stack_size)
370370
// The first Frame is the coroutine Frame, so the Python stack size is the number of Frames - 1
371371
upper_python_stack_size = new_frames - 1;
372372

373+
// If the Task is on CPU, then we should have at least the asyncio runtime Frames on top of
374+
// the asynchronous Stack. If we do not have any Python Frames, then the execution state changed
375+
// (race condition) and we cannot recover.
376+
if (this->is_on_cpu && upper_python_stack_size == 0) {
377+
return ErrorKind::TaskInfoError;
378+
}
379+
373380
// Remove the Python Frames from the Stack (they will be added back later)
374381
// We cannot push those Frames now because otherwise they would be added once per Task,
375382
// we only want to add them once per Leaf Task, and on top of all non-leaf Tasks.

ddtrace/internal/datadog/profiling/stack_v2/echion/echion/threads.h

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,36 @@ ThreadInfo::unwind(PyThreadState* tstate)
205205
inline Result<void>
206206
ThreadInfo::unwind_tasks()
207207
{
208+
// Check if the Python stack contains "_run".
209+
// To avoid having to do string comparisons every time we unwind Tasks, we keep track
210+
// of the cache key of the "_run" Frame.
211+
static std::optional<Frame::Key> frame_cache_key;
212+
bool expect_at_least_one_running_task = false;
213+
if (!frame_cache_key) {
214+
for (size_t i = 0; i < python_stack.size(); i++) {
215+
const auto& frame = python_stack[i].get();
216+
const auto& frame_name = string_table.lookup(frame.name)->get();
217+
if (frame_name.size() >= 4 && frame_name.rfind("_run") == frame_name.size() - 4) {
218+
219+
// Although Frames are stored in an LRUCache, the cache key is ALWAYS the same
220+
// even if the Frame gets evicted from the cache.
221+
// This means we can keep the cache key and re-use it to determine
222+
// whether we see the "_run" Frame in the Python stack.
223+
frame_cache_key = frame.cache_key;
224+
expect_at_least_one_running_task = true;
225+
break;
226+
}
227+
}
228+
} else {
229+
for (size_t i = 0; i < python_stack.size(); i++) {
230+
const auto& frame = python_stack[i].get();
231+
if (frame.cache_key == *frame_cache_key) {
232+
expect_at_least_one_running_task = true;
233+
break;
234+
}
235+
}
236+
}
237+
208238
std::vector<TaskInfo::Ref> leaf_tasks;
209239
std::unordered_set<PyObject*> parent_tasks;
210240
std::unordered_map<PyObject*, TaskInfo::Ref> waitee_map; // Indexed by task origin
@@ -217,6 +247,22 @@ ThreadInfo::unwind_tasks()
217247
}
218248

219249
auto all_tasks = std::move(*maybe_all_tasks);
250+
251+
bool saw_at_least_one_running_task = false;
252+
std::string running_task_name;
253+
for (const auto& task_ref : all_tasks) {
254+
const auto& task = task_ref.get();
255+
if (task->is_on_cpu) {
256+
running_task_name = string_table.lookup(task->name)->get();
257+
saw_at_least_one_running_task = true;
258+
break;
259+
}
260+
}
261+
262+
if (saw_at_least_one_running_task != expect_at_least_one_running_task) {
263+
return ErrorKind::TaskInfoError;
264+
}
265+
220266
{
221267
std::lock_guard<std::mutex> lock(task_link_map_lock);
222268

@@ -284,8 +330,13 @@ ThreadInfo::unwind_tasks()
284330
for (auto current_task = leaf_task;;) {
285331
auto& task = current_task.get();
286332

333+
auto maybe_task_stack_size = task.unwind(stack, task.is_on_cpu ? upper_python_stack_size : unused);
334+
if (!maybe_task_stack_size) {
335+
return ErrorKind::TaskInfoError;
336+
}
337+
287338
// The task_stack_size includes both the coroutines frames and the "upper" Python synchronous frames
288-
size_t task_stack_size = task.unwind(stack, task.is_on_cpu ? upper_python_stack_size : unused);
339+
size_t task_stack_size = std::move(*maybe_task_stack_size);
289340
if (task.is_on_cpu) {
290341
// Get the "bottom" part of the Python synchronous Stack, that is to say the
291342
// synchronous functions and coroutines called by the Task's outermost coroutine
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
fixes:
2+
- |
3+
profiling: This fix works around a race condition leading to incorrect stacks sometimes being reported when
4+
profiling on-CPU asyncio Tasks.

0 commit comments

Comments
 (0)