Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 19 additions & 30 deletions ddtrace/internal/datadog/profiling/stack_v2/echion/echion/tasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,13 @@ class GenInfo
public:
typedef std::unique_ptr<GenInfo> Ptr;

PyObject* origin = NULL;
PyObject* frame = NULL;
PyObject* origin = nullptr;
PyObject* frame = nullptr;

// The coroutine awaited by this coroutine, if any
GenInfo::Ptr await = nullptr;

// Whether the coroutine, or the coroutine it awaits, is currently running (on CPU)
bool is_running = false;

[[nodiscard]] static Result<GenInfo::Ptr> create(PyObject* gen_addr);
Expand Down Expand Up @@ -149,13 +151,21 @@ GenInfo::create(PyObject* gen_addr)
}
}

// A coroutine awaiting another coroutine is never running itself,
// so when the coroutine is awaiting another coroutine, we use the running state of the awaited coroutine.
// Otherwise, we use the running state of the coroutine itself.
bool is_running = false;
if (await) {
is_running = await->is_running;
} else {
#if PY_VERSION_HEX >= 0x030b0000
auto is_running = (gen.gi_frame_state == FRAME_EXECUTING);
is_running = (gen.gi_frame_state == FRAME_EXECUTING);
#elif PY_VERSION_HEX >= 0x030a0000
auto is_running = (frame != NULL) ? _PyFrame_IsExecuting(&f) : false;
is_running = (frame != NULL) ? _PyFrame_IsExecuting(&f) : false;
#else
auto is_running = gen.gi_running;
is_running = gen.gi_running;
#endif
}

recursion_depth--;
return std::make_unique<GenInfo>(gen_addr, frame, std::move(await), is_running);
Expand All @@ -178,7 +188,7 @@ class TaskInfo

// Information to reconstruct the async stack as best as we can
TaskInfo::Ptr waiter = nullptr;
std::optional<bool> is_on_cpu_ = std::nullopt;
bool is_on_cpu = false;

[[nodiscard]] static Result<TaskInfo::Ptr> create(TaskObj*);
TaskInfo(PyObject* origin, PyObject* loop, GenInfo::Ptr coro, StringTable::Key name, TaskInfo::Ptr waiter)
Expand All @@ -187,32 +197,12 @@ class TaskInfo
, coro(std::move(coro))
, name(name)
, waiter(std::move(waiter))
, is_on_cpu(this->coro && this->coro->is_running)
{
}

[[nodiscard]] static Result<TaskInfo::Ptr> current(PyObject*);
inline size_t unwind(FrameStack&);

// Check if any coroutine in the chain is currently running (on CPU)
inline bool is_on_cpu()
{
if (is_on_cpu_.has_value()) {
return *is_on_cpu_;
}

auto* last_coro = static_cast<GenInfo*>(nullptr);
auto* current_coro = this->coro.get();

// Check if the innermost coroutine is running.
// We don't need to test all the other coroutines as they are awaiting, by definition.
while (current_coro) {
last_coro = current_coro;
current_coro = current_coro->await.get();
}

is_on_cpu_ = last_coro && last_coro->is_running;
return *is_on_cpu_;
}
};

inline std::unordered_map<PyObject*, PyObject*> task_link_map;
Expand Down Expand Up @@ -242,8 +232,6 @@ TaskInfo::create(TaskObj* task_addr)
return ErrorKind::TaskInfoGeneratorError;
}

auto origin = reinterpret_cast<PyObject*>(task_addr);

auto maybe_name = string_table.key(task.task_name);
if (!maybe_name) {
recursion_depth--;
Expand All @@ -262,7 +250,8 @@ TaskInfo::create(TaskObj* task_addr)
}

recursion_depth--;
return std::make_unique<TaskInfo>(origin, loop, std::move(*maybe_coro), name, std::move(waiter));
return std::make_unique<TaskInfo>(
reinterpret_cast<PyObject*>(task_addr), loop, std::move(*maybe_coro), name, std::move(waiter));
}

// ----------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,25 +252,15 @@ ThreadInfo::unwind_tasks()
}
}

// Only one Task can be on CPU at a time.
// Since determining if a task is on CPU is somewhat costly, we
// stop checking if Tasks are on CPU after seeing the first one.
bool on_cpu_task_seen = false;
for (auto& leaf_task : leaf_tasks) {
bool on_cpu = false;
if (!on_cpu_task_seen) {
on_cpu = leaf_task.get().is_on_cpu();
on_cpu_task_seen = on_cpu;
}

auto stack_info = std::make_unique<StackInfo>(leaf_task.get().name, on_cpu);
auto stack_info = std::make_unique<StackInfo>(leaf_task.get().name, leaf_task.get().is_on_cpu);
auto& stack = stack_info->stack;
for (auto current_task = leaf_task;;) {
auto& task = current_task.get();

size_t stack_size = task.unwind(stack);

if (on_cpu) {
if (task.is_on_cpu) {
// Undo the stack unwinding
// TODO[perf]: not super-efficient :(
for (size_t i = 0; i < stack_size; i++)
Expand Down
Loading