diff --git a/ddtrace/internal/datadog/profiling/stack_v2/echion/echion/threads.h b/ddtrace/internal/datadog/profiling/stack_v2/echion/echion/threads.h index defaed8957e..9d2a727fbd8 100644 --- a/ddtrace/internal/datadog/profiling/stack_v2/echion/echion/threads.h +++ b/ddtrace/internal/datadog/profiling/stack_v2/echion/echion/threads.h @@ -209,6 +209,7 @@ ThreadInfo::unwind_tasks() std::unordered_set parent_tasks; std::unordered_map waitee_map; // Indexed by task origin std::unordered_map origin_map; // Indexed by task origin + static std::unordered_set previous_task_objects; auto maybe_all_tasks = get_all_tasks(reinterpret_cast(asyncio_loop)); if (!maybe_all_tasks) { @@ -232,14 +233,25 @@ ThreadInfo::unwind_tasks() if (all_task_origins.find(kv.first) == all_task_origins.end()) to_remove.push_back(kv.first); } - for (auto key : to_remove) - task_link_map.erase(key); + for (auto key : to_remove) { + // Only remove the link if the Child Task previously existed; otherwise it's a Task that + // has just been created and that wasn't in all_tasks when we took the snapshot. + if (previous_task_objects.find(key) != previous_task_objects.end()) { + task_link_map.erase(key); + } + } // Determine the parent tasks from the gather links. std::transform(task_link_map.cbegin(), task_link_map.cend(), std::inserter(parent_tasks, parent_tasks.begin()), [](const std::pair& kv) { return kv.second; }); + + // Copy all Task object pointers into previous_task_objects + previous_task_objects.clear(); + for (const auto& task : all_tasks) { + previous_task_objects.insert(task->origin); + } } for (auto& task : all_tasks) { diff --git a/releasenotes/notes/profiling-fix-task-link-race-condition-891ac5c86af8a7c1.yaml b/releasenotes/notes/profiling-fix-task-link-race-condition-891ac5c86af8a7c1.yaml new file mode 100644 index 00000000000..0caab9b4044 --- /dev/null +++ b/releasenotes/notes/profiling-fix-task-link-race-condition-891ac5c86af8a7c1.yaml @@ -0,0 +1,4 @@ +fixes: + - | + profiling: This fix resolves a race condition leading to incorrect stacks being reported + for asyncio parent/child Tasks (e.g. when using ``asyncio.gather``). diff --git a/tests/profiling/collector/test_asyncio_as_completed.py b/tests/profiling/collector/test_asyncio_as_completed.py index c6915753172..67d237f4387 100644 --- a/tests/profiling/collector/test_asyncio_as_completed.py +++ b/tests/profiling/collector/test_asyncio_as_completed.py @@ -15,9 +15,12 @@ def test_asyncio_as_completed() -> None: from sys import version_info as PYVERSION from ddtrace.internal.datadog.profiling import stack_v2 + from ddtrace.internal.logger import get_logger from ddtrace.profiling import profiler from tests.profiling.collector import pprof_utils + LOG = get_logger(__name__) + assert stack_v2.is_available, stack_v2.failure_msg async def other(t: float) -> None: @@ -30,9 +33,12 @@ async def wait_and_return_delay(t: float) -> float: async def main() -> None: # Create a mix of Tasks and Coroutines futures = [ - asyncio.create_task(wait_and_return_delay(i / 10)) if i % 2 == 0 else wait_and_return_delay(i / 10) - for i in range(10) + asyncio.create_task(wait_and_return_delay(float(i) / 10)) + if i % 2 == 0 + else wait_and_return_delay(float(i) / 10) + for i in range(2, 12) ] + assert len(futures) == 10 # Randomize the order of the futures random.shuffle(futures) @@ -61,6 +67,17 @@ async def main() -> None: profile = pprof_utils.parse_newest_profile(output_filename) + task_names_in_profile = sorted( + set( + [ + (profile.string_table[label.str]) + for sample in profile.sample + for label in sample.label + if profile.string_table[label.key] == "task name" + ] + ) + ) + samples = pprof_utils.get_samples_with_label_key(profile, "task name") assert len(samples) > 0 @@ -73,7 +90,7 @@ async def main() -> None: pprof_utils.StackLocation( function_name="main", filename="test_asyncio_as_completed.py", - line_no=main.__code__.co_firstlineno + 14, + line_no=main.__code__.co_firstlineno + 17, ), ] @@ -91,11 +108,45 @@ async def main() -> None: ), ] + locations - pprof_utils.assert_profile_has_sample( - profile, - samples, - expected_sample=pprof_utils.StackEvent( - thread_name="MainThread", - locations=locations, - ), - ) + # Now, check that we have seen those locations for each Task we've created. + # (They should be named Task-2 .. Task-11, which is the automatic name assigned to Tasks by asyncio.create_task) + # Note: we expect one Task to not be seen (and thus accept to recover from one failure). The reason + # is that there is a bug in ddtrace that makes one Task (randomly picked) appear "as part of" the Parent Task, + # and this Task thus gets the Parent Task's name and not its own. + seen_all_except_one = True + seen_task_names: set[str] = set() + for i in range(2, 12): + try: + pprof_utils.assert_profile_has_sample( + profile, + samples, + expected_sample=pprof_utils.StackEvent( + task_name=f"Task-{i}", + thread_name="MainThread", + locations=locations, + ), + ) + + seen_task_names.add(f"Task-{i}") + except AssertionError: + if not seen_all_except_one: + LOG.error( + f"More than one Task has not been seen; i = {i} " # noqa: G004 + f"seen_task_names = {seen_task_names} " + f"task_names_in_profile = {task_names_in_profile}" + ) + raise + + # This is the bug situation. + # Check that we have seen the expected locations for the Parent Task (Task-1) + # If that isn't the case, then something else is broken. + pprof_utils.assert_profile_has_sample( + profile, + samples, + expected_sample=pprof_utils.StackEvent( + task_name="Task-1", + thread_name="MainThread", + locations=locations, + ), + ) + seen_all_except_one = False