Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ddtrace/internal/datadog/profiling/stack/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def unregister_thread(python_thread_id: int) -> None: ...
# Asyncio support
def track_asyncio_loop(thread_id: int, loop: Optional[asyncio.AbstractEventLoop]) -> None: ...
def link_tasks(parent: asyncio.Task, child: asyncio.Future) -> None: ...
def weak_link_tasks(parent: asyncio.Task, child: asyncio.Future) -> None: ...
def init_asyncio(
current_tasks: Sequence[asyncio.Task],
scheduled_tasks: Sequence[asyncio.Task],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class TaskInfo
};

inline std::unordered_map<PyObject*, PyObject*> task_link_map;
inline std::unordered_map<PyObject*, PyObject*> weak_task_link_map;
inline std::mutex task_link_map_lock;

// ----------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#include "constants.hpp"
#include "stack_renderer.hpp"

#include "echion/strings.h"

#include <atomic>

namespace Datadog {
Expand Down Expand Up @@ -50,6 +52,7 @@ class Sampler
PyObject* _asyncio_scheduled_tasks,
PyObject* _asyncio_eager_tasks);
void link_tasks(PyObject* parent, PyObject* child);
void weak_link_tasks(PyObject* parent, PyObject* child);
void sampling_thread(const uint64_t seq_num);
void track_greenlet(uintptr_t greenlet_id, StringTable::Key name, PyObject* frame);
void untrack_greenlet(uintptr_t greenlet_id);
Expand Down
38 changes: 38 additions & 0 deletions ddtrace/internal/datadog/profiling/stack/src/echion/threads.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,37 @@ ThreadInfo::unwind_tasks(PyThreadState* tstate)
std::inserter(parent_tasks, parent_tasks.begin()),
[](const std::pair<PyObject*, PyObject*>& kv) { return kv.second; });

// Clean up the weak_task_link_map.
// Remove entries associated to tasks that no longer exist.
all_task_origins.clear();
std::transform(all_tasks.cbegin(),
all_tasks.cend(),
std::inserter(all_task_origins, all_task_origins.begin()),
[](const TaskInfo::Ptr& task) { return task->origin; });

to_remove.clear();
for (auto kv : weak_task_link_map) {
if (all_task_origins.find(kv.first) == all_task_origins.end())
to_remove.push_back(kv.first);
}

for (auto key : to_remove) {
weak_task_link_map.erase(key);
}

// Determine the parent tasks from the gather (strong) links.
for (auto& link : task_link_map) {
auto parent = link.second;

// Check if the parent is actually the child of another Task
auto is_child = weak_task_link_map.find(parent) != weak_task_link_map.end();

// Only insert if we do not know of a Task that created the current Task
if (!is_child) {
parent_tasks.insert(parent);
}
}

// Copy all Task object pointers into previous_task_objects
previous_task_objects.clear();
for (const auto& task : all_tasks) {
Expand Down Expand Up @@ -189,6 +220,13 @@ ThreadInfo::unwind_tasks(PyThreadState* tstate)
}
}

// Check for weak links
if (weak_task_link_map.find(task_origin) != weak_task_link_map.end() &&
origin_map.find(weak_task_link_map[task_origin]) != origin_map.end()) {
current_task = origin_map.find(weak_task_link_map[task_origin])->second;
continue;
}

break;
}

Expand Down
7 changes: 7 additions & 0 deletions ddtrace/internal/datadog/profiling/stack/src/sampler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,13 @@ Sampler::link_tasks(PyObject* parent, PyObject* child)
task_link_map[child] = parent;
}

void
Sampler::weak_link_tasks(PyObject* parent, PyObject* child)
{
std::lock_guard<std::mutex> guard(task_link_map_lock);
weak_task_link_map[child] = parent;
}

void
Sampler::track_greenlet(uintptr_t greenlet_id, StringTable::Key name, PyObject* frame)
{
Expand Down
18 changes: 18 additions & 0 deletions ddtrace/internal/datadog/profiling/stack/src/stack.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,23 @@ stack_link_tasks(PyObject* self, PyObject* args)
Py_RETURN_NONE;
}

static PyObject*
stack_weak_link_tasks(PyObject* self, PyObject* args)
{
(void)self;
PyObject *parent, *child;

if (!PyArg_ParseTuple(args, "OO", &parent, &child)) {
return NULL;
}

Py_BEGIN_ALLOW_THREADS;
Sampler::get().weak_link_tasks(parent, child);
Py_END_ALLOW_THREADS;

Py_RETURN_NONE;
}

static PyObject*
stack_set_adaptive_sampling(PyObject* Py_UNUSED(self), PyObject* args)
{
Expand Down Expand Up @@ -285,6 +302,7 @@ static PyMethodDef _stack_methods[] = {
{ "track_asyncio_loop", stack_track_asyncio_loop, METH_VARARGS, "Map the name of a task with its identifier" },
{ "init_asyncio", stack_init_asyncio, METH_VARARGS, "Initialise asyncio tracking" },
{ "link_tasks", stack_link_tasks, METH_VARARGS, "Link two tasks" },
{ "weak_link_tasks", stack_weak_link_tasks, METH_VARARGS, "Weakly link two tasks" },
// greenlet support
{ "track_greenlet", track_greenlet, METH_VARARGS, "Map a greenlet with its identifier" },
{ "untrack_greenlet", untrack_greenlet, METH_VARARGS, "Untrack a terminated greenlet" },
Expand Down
14 changes: 14 additions & 0 deletions ddtrace/profiling/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,20 @@ def _(
# if it times out. The timeout._task is the same as the current task, so there's
# no parent-child relationship to link. The timeout mechanism is handled by the
# event loop's timeout handler, not by creating new tasks.
@partial(wrap, sys.modules["asyncio"].tasks.create_task)
def _(
f: typing.Callable[..., "aio.Task[typing.Any]"],
args: tuple[typing.Any, ...],
kwargs: dict[str, typing.Any],
) -> "aio.Task[typing.Any]":
# kwargs will typically contain context (Python 3.11+ only) and eager_start (Python 3.14+ only)
task: "aio.Task[typing.Any]" = f(*args, **kwargs)
parent: typing.Optional["aio.Task[typing.Any]"] = globals()["current_task"]()

if parent is not None:
stack.weak_link_tasks(parent, task)

return task

_call_init_asyncio(asyncio)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
features:
- |
profiling: The profiler now supports tracking parent-child relationships between asyncio tasks. This will result in
better stacks in flame graphs.
Loading