Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ ThreadInfo::unwind_tasks()
std::unordered_set<PyObject*> parent_tasks;
std::unordered_map<PyObject*, TaskInfo::Ref> waitee_map; // Indexed by task origin
std::unordered_map<PyObject*, TaskInfo::Ref> origin_map; // Indexed by task origin
static std::unordered_set<PyObject*> previous_task_objects;

auto maybe_all_tasks = get_all_tasks(reinterpret_cast<PyObject*>(asyncio_loop));
if (!maybe_all_tasks) {
Expand All @@ -232,14 +233,25 @@ ThreadInfo::unwind_tasks()
if (all_task_origins.find(kv.first) == all_task_origins.end())
to_remove.push_back(kv.first);
}
for (auto key : to_remove)
task_link_map.erase(key);
for (auto key : to_remove) {
// Only remove the link if the Child Task previously existed; otherwise it's a Task that
// has just been created and that wasn't in all_tasks when we took the snapshot.
if (previous_task_objects.find(key) != previous_task_objects.end()) {
task_link_map.erase(key);
}
}

// Determine the parent tasks from the gather links.
std::transform(task_link_map.cbegin(),
task_link_map.cend(),
std::inserter(parent_tasks, parent_tasks.begin()),
[](const std::pair<PyObject*, PyObject*>& kv) { return kv.second; });

// Copy all Task object pointers into previous_task_objects
previous_task_objects.clear();
for (const auto& task : all_tasks) {
previous_task_objects.insert(task->origin);
}
}

for (auto& task : all_tasks) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
fixes:
- |
profiling: This fix resolves a race condition leading to incorrect stacks being reported
for asyncio parent/child Tasks (e.g. when using ``asyncio.gather``).
73 changes: 62 additions & 11 deletions tests/profiling/collector/test_asyncio_as_completed.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ def test_asyncio_as_completed() -> None:
from sys import version_info as PYVERSION

from ddtrace.internal.datadog.profiling import stack_v2
from ddtrace.internal.logger import get_logger
from ddtrace.profiling import profiler
from tests.profiling.collector import pprof_utils

LOG = get_logger(__name__)

assert stack_v2.is_available, stack_v2.failure_msg

async def other(t: float) -> None:
Expand All @@ -30,9 +33,12 @@ async def wait_and_return_delay(t: float) -> float:
async def main() -> None:
# Create a mix of Tasks and Coroutines
futures = [
asyncio.create_task(wait_and_return_delay(i / 10)) if i % 2 == 0 else wait_and_return_delay(i / 10)
for i in range(10)
asyncio.create_task(wait_and_return_delay(float(i) / 10))
if i % 2 == 0
else wait_and_return_delay(float(i) / 10)
for i in range(2, 12)
]
assert len(futures) == 10

# Randomize the order of the futures
random.shuffle(futures)
Expand Down Expand Up @@ -61,6 +67,17 @@ async def main() -> None:

profile = pprof_utils.parse_newest_profile(output_filename)

task_names_in_profile = sorted(
set(
[
(profile.string_table[label.str])
for sample in profile.sample
for label in sample.label
if profile.string_table[label.key] == "task name"
]
)
)

samples = pprof_utils.get_samples_with_label_key(profile, "task name")
assert len(samples) > 0

Expand All @@ -73,7 +90,7 @@ async def main() -> None:
pprof_utils.StackLocation(
function_name="main",
filename="test_asyncio_as_completed.py",
line_no=main.__code__.co_firstlineno + 14,
line_no=main.__code__.co_firstlineno + 17,
),
]

Expand All @@ -91,11 +108,45 @@ async def main() -> None:
),
] + locations

pprof_utils.assert_profile_has_sample(
profile,
samples,
expected_sample=pprof_utils.StackEvent(
thread_name="MainThread",
locations=locations,
),
)
# Now, check that we have seen those locations for each Task we've created.
# (They should be named Task-2 .. Task-11, which is the automatic name assigned to Tasks by asyncio.create_task)
# Note: we expect one Task to not be seen (and thus accept to recover from one failure). The reason
# is that there is a bug in ddtrace that makes one Task (randomly picked) appear "as part of" the Parent Task,
# and this Task thus gets the Parent Task's name and not its own.
seen_all_except_one = True
seen_task_names: set[str] = set()
for i in range(2, 12):
try:
pprof_utils.assert_profile_has_sample(
profile,
samples,
expected_sample=pprof_utils.StackEvent(
task_name=f"Task-{i}",
thread_name="MainThread",
locations=locations,
),
)

seen_task_names.add(f"Task-{i}")
except AssertionError:
if not seen_all_except_one:
LOG.error(
f"More than one Task has not been seen; i = {i} " # noqa: G004
f"seen_task_names = {seen_task_names} "
f"task_names_in_profile = {task_names_in_profile}"
)
raise

# This is the bug situation.
# Check that we have seen the expected locations for the Parent Task (Task-1)
# If that isn't the case, then something else is broken.
pprof_utils.assert_profile_has_sample(
profile,
samples,
expected_sample=pprof_utils.StackEvent(
task_name="Task-1",
thread_name="MainThread",
locations=locations,
),
)
seen_all_except_one = False
Loading