diff --git a/ddtrace/internal/datadog/profiling/stack/__init__.pyi b/ddtrace/internal/datadog/profiling/stack/__init__.pyi index fb58d23dfe2..1cdb9d90080 100644 --- a/ddtrace/internal/datadog/profiling/stack/__init__.pyi +++ b/ddtrace/internal/datadog/profiling/stack/__init__.pyi @@ -25,6 +25,7 @@ def unregister_thread(python_thread_id: int) -> None: ... # Asyncio support def track_asyncio_loop(thread_id: int, loop: Optional[asyncio.AbstractEventLoop]) -> None: ... def link_tasks(parent: asyncio.Task, child: asyncio.Future) -> None: ... +def weak_link_tasks(parent: asyncio.Task, child: asyncio.Future) -> None: ... def init_asyncio( current_tasks: Sequence[asyncio.Task], scheduled_tasks: Sequence[asyncio.Task], diff --git a/ddtrace/internal/datadog/profiling/stack/echion/echion/tasks.h b/ddtrace/internal/datadog/profiling/stack/echion/echion/tasks.h index b0d06e223e4..02dc1e621bb 100644 --- a/ddtrace/internal/datadog/profiling/stack/echion/echion/tasks.h +++ b/ddtrace/internal/datadog/profiling/stack/echion/echion/tasks.h @@ -142,6 +142,7 @@ class TaskInfo }; inline std::unordered_map task_link_map; +inline std::unordered_map weak_task_link_map; inline std::mutex task_link_map_lock; // ---------------------------------------------------------------------------- diff --git a/ddtrace/internal/datadog/profiling/stack/include/sampler.hpp b/ddtrace/internal/datadog/profiling/stack/include/sampler.hpp index 08e76ffc5fb..5e7e8970ac4 100644 --- a/ddtrace/internal/datadog/profiling/stack/include/sampler.hpp +++ b/ddtrace/internal/datadog/profiling/stack/include/sampler.hpp @@ -2,6 +2,8 @@ #include "constants.hpp" #include "stack_renderer.hpp" +#include "echion/strings.h" + #include namespace Datadog { @@ -50,6 +52,7 @@ class Sampler PyObject* _asyncio_scheduled_tasks, PyObject* _asyncio_eager_tasks); void link_tasks(PyObject* parent, PyObject* child); + void weak_link_tasks(PyObject* parent, PyObject* child); void sampling_thread(const uint64_t seq_num); void track_greenlet(uintptr_t greenlet_id, StringTable::Key name, PyObject* frame); void untrack_greenlet(uintptr_t greenlet_id); diff --git a/ddtrace/internal/datadog/profiling/stack/src/echion/threads.cc b/ddtrace/internal/datadog/profiling/stack/src/echion/threads.cc index d36ed99c432..f2bac74027b 100644 --- a/ddtrace/internal/datadog/profiling/stack/src/echion/threads.cc +++ b/ddtrace/internal/datadog/profiling/stack/src/echion/threads.cc @@ -113,6 +113,37 @@ ThreadInfo::unwind_tasks(PyThreadState* tstate) std::inserter(parent_tasks, parent_tasks.begin()), [](const std::pair& kv) { return kv.second; }); + // Clean up the weak_task_link_map. + // Remove entries associated to tasks that no longer exist. + all_task_origins.clear(); + std::transform(all_tasks.cbegin(), + all_tasks.cend(), + std::inserter(all_task_origins, all_task_origins.begin()), + [](const TaskInfo::Ptr& task) { return task->origin; }); + + to_remove.clear(); + for (auto kv : weak_task_link_map) { + if (all_task_origins.find(kv.first) == all_task_origins.end()) + to_remove.push_back(kv.first); + } + + for (auto key : to_remove) { + weak_task_link_map.erase(key); + } + + // Determine the parent tasks from the gather (strong) links. + for (auto& link : task_link_map) { + auto parent = link.second; + + // Check if the parent is actually the child of another Task + auto is_child = weak_task_link_map.find(parent) != weak_task_link_map.end(); + + // Only insert if we do not know of a Task that created the current Task + if (!is_child) { + parent_tasks.insert(parent); + } + } + // Copy all Task object pointers into previous_task_objects previous_task_objects.clear(); for (const auto& task : all_tasks) { @@ -189,6 +220,13 @@ ThreadInfo::unwind_tasks(PyThreadState* tstate) } } + // Check for weak links + if (weak_task_link_map.find(task_origin) != weak_task_link_map.end() && + origin_map.find(weak_task_link_map[task_origin]) != origin_map.end()) { + current_task = origin_map.find(weak_task_link_map[task_origin])->second; + continue; + } + break; } diff --git a/ddtrace/internal/datadog/profiling/stack/src/sampler.cpp b/ddtrace/internal/datadog/profiling/stack/src/sampler.cpp index 05a1f93fd65..d1a54624993 100644 --- a/ddtrace/internal/datadog/profiling/stack/src/sampler.cpp +++ b/ddtrace/internal/datadog/profiling/stack/src/sampler.cpp @@ -379,6 +379,13 @@ Sampler::link_tasks(PyObject* parent, PyObject* child) task_link_map[child] = parent; } +void +Sampler::weak_link_tasks(PyObject* parent, PyObject* child) +{ + std::lock_guard guard(task_link_map_lock); + weak_task_link_map[child] = parent; +} + void Sampler::track_greenlet(uintptr_t greenlet_id, StringTable::Key name, PyObject* frame) { diff --git a/ddtrace/internal/datadog/profiling/stack/src/stack.cpp b/ddtrace/internal/datadog/profiling/stack/src/stack.cpp index fbc1e6105e8..5bed423a021 100644 --- a/ddtrace/internal/datadog/profiling/stack/src/stack.cpp +++ b/ddtrace/internal/datadog/profiling/stack/src/stack.cpp @@ -186,6 +186,23 @@ stack_link_tasks(PyObject* self, PyObject* args) Py_RETURN_NONE; } +static PyObject* +stack_weak_link_tasks(PyObject* self, PyObject* args) +{ + (void)self; + PyObject *parent, *child; + + if (!PyArg_ParseTuple(args, "OO", &parent, &child)) { + return NULL; + } + + Py_BEGIN_ALLOW_THREADS; + Sampler::get().weak_link_tasks(parent, child); + Py_END_ALLOW_THREADS; + + Py_RETURN_NONE; +} + static PyObject* stack_set_adaptive_sampling(PyObject* Py_UNUSED(self), PyObject* args) { @@ -285,6 +302,7 @@ static PyMethodDef _stack_methods[] = { { "track_asyncio_loop", stack_track_asyncio_loop, METH_VARARGS, "Map the name of a task with its identifier" }, { "init_asyncio", stack_init_asyncio, METH_VARARGS, "Initialise asyncio tracking" }, { "link_tasks", stack_link_tasks, METH_VARARGS, "Link two tasks" }, + { "weak_link_tasks", stack_weak_link_tasks, METH_VARARGS, "Weakly link two tasks" }, // greenlet support { "track_greenlet", track_greenlet, METH_VARARGS, "Map a greenlet with its identifier" }, { "untrack_greenlet", untrack_greenlet, METH_VARARGS, "Untrack a terminated greenlet" }, diff --git a/ddtrace/profiling/_asyncio.py b/ddtrace/profiling/_asyncio.py index 6dfa9f5da24..243985657c7 100644 --- a/ddtrace/profiling/_asyncio.py +++ b/ddtrace/profiling/_asyncio.py @@ -234,6 +234,20 @@ def _( # if it times out. The timeout._task is the same as the current task, so there's # no parent-child relationship to link. The timeout mechanism is handled by the # event loop's timeout handler, not by creating new tasks. + @partial(wrap, sys.modules["asyncio"].tasks.create_task) + def _( + f: typing.Callable[..., "aio.Task[typing.Any]"], + args: tuple[typing.Any, ...], + kwargs: dict[str, typing.Any], + ) -> "aio.Task[typing.Any]": + # kwargs will typically contain context (Python 3.11+ only) and eager_start (Python 3.14+ only) + task: "aio.Task[typing.Any]" = f(*args, **kwargs) + parent: typing.Optional["aio.Task[typing.Any]"] = globals()["current_task"]() + + if parent is not None: + stack.weak_link_tasks(parent, task) + + return task _call_init_asyncio(asyncio) diff --git a/releasenotes/notes/profiling-support-for-weak-links-48427bed4033c2f6.yaml b/releasenotes/notes/profiling-support-for-weak-links-48427bed4033c2f6.yaml new file mode 100644 index 00000000000..a5b9007c921 --- /dev/null +++ b/releasenotes/notes/profiling-support-for-weak-links-48427bed4033c2f6.yaml @@ -0,0 +1,4 @@ +features: + - | + profiling: The profiler now supports tracking parent-child relationships between asyncio tasks. This will result in + better stacks in flame graphs. diff --git a/tests/profiling/collector/test_stack_asyncio.py b/tests/profiling/collector/test_asyncio_import_order.py similarity index 86% rename from tests/profiling/collector/test_stack_asyncio.py rename to tests/profiling/collector/test_asyncio_import_order.py index c2844bbf1fd..b6059ad88ee 100644 --- a/tests/profiling/collector/test_stack_asyncio.py +++ b/tests/profiling/collector/test_asyncio_import_order.py @@ -1,125 +1,6 @@ import pytest -@pytest.mark.subprocess( - env=dict( - DD_PROFILING_OUTPUT_PPROF="/tmp/test_stack_asyncio", - ), - err=None, -) -# For macOS: err=None ignores expected stderr from tracer failing to connect to agent (not relevant to this test) -def test_asyncio(): - import asyncio - import os - import time - import uuid - - from ddtrace import ext - from ddtrace.internal.datadog.profiling import stack - from ddtrace.profiling import profiler - from ddtrace.trace import tracer - from tests.profiling.collector import pprof_utils - - assert stack.is_available, stack.failure_msg - - sleep_time = 0.2 - loop_run_time = 3 - - async def stuff() -> None: - start_time = time.time() - while time.time() < start_time + loop_run_time: - await asyncio.sleep(sleep_time) - - async def hello(): - t1 = asyncio.create_task(stuff(), name="sleep 1") - t2 = asyncio.create_task(stuff(), name="sleep 2") - await stuff() - return (t1, t2) - - resource = str(uuid.uuid4()) - span_type = ext.SpanTypes.WEB - - p = profiler.Profiler(tracer=tracer) - p.start() - with tracer.trace("test_asyncio", resource=resource, span_type=span_type) as span: - span_id = span.span_id - local_root_span_id = span._local_root.span_id - - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - maintask = loop.create_task(hello(), name="main") - - t1, t2 = loop.run_until_complete(maintask) - p.stop() - - t1_name = t1.get_name() - t2_name = t2.get_name() - - assert t1_name == "sleep 1" - assert t2_name == "sleep 2" - - output_filename = os.environ["DD_PROFILING_OUTPUT_PPROF"] + "." + str(os.getpid()) - - profile = pprof_utils.parse_newest_profile(output_filename) - - samples_with_span_id = pprof_utils.get_samples_with_label_key(profile, "span id") - assert len(samples_with_span_id) > 0 - - # get samples with task_name - samples = pprof_utils.get_samples_with_label_key(profile, "task name") - # The next fails if stack is not properly configured with asyncio task - # tracking via ddtrace.profiling._asyncio - assert len(samples) > 0 - - pprof_utils.assert_profile_has_sample( - profile, - samples, - expected_sample=pprof_utils.StackEvent( - thread_name="MainThread", - task_name="main", - span_id=span_id, - local_root_span_id=local_root_span_id, - locations=[ - pprof_utils.StackLocation( - function_name="hello", filename="test_stack_asyncio.py", line_no=hello.__code__.co_firstlineno + 3 - ) - ], - ), - ) - - pprof_utils.assert_profile_has_sample( - profile, - samples, - expected_sample=pprof_utils.StackEvent( - thread_name="MainThread", - task_name=t1_name, - span_id=span_id, - local_root_span_id=local_root_span_id, - locations=[ - pprof_utils.StackLocation( - function_name="stuff", filename="test_stack_asyncio.py", line_no=stuff.__code__.co_firstlineno + 3 - ), - ], - ), - ) - - pprof_utils.assert_profile_has_sample( - profile, - samples, - expected_sample=pprof_utils.StackEvent( - thread_name="MainThread", - task_name=t2_name, - span_id=span_id, - local_root_span_id=local_root_span_id, - locations=[ - pprof_utils.StackLocation( - function_name="stuff", filename="test_stack_asyncio.py", line_no=stuff.__code__.co_firstlineno + 3 - ), - ], - ), - ) - - @pytest.mark.subprocess( env=dict( DD_PROFILING_OUTPUT_PPROF="/tmp/test_asyncio_start_profiler_from_process_before_importing_asyncio", @@ -127,7 +8,7 @@ async def hello(): err=None, ) # For macOS: err=None ignores expected stderr from tracer failing to connect to agent (not relevant to this test) -def test_asyncio_start_profiler_from_process_before_importing_asyncio(): +def test_asyncio_start_profiler_from_process_before_importing_asyncio() -> None: from ddtrace.internal.datadog.profiling import stack from ddtrace.profiling import profiler @@ -217,6 +98,7 @@ async def main_task(): ), ], ), + print_samples_on_failure=True, ) # Verify specific tasks are in the profile @@ -240,6 +122,7 @@ async def main_task(): ) ], ), + print_samples_on_failure=True, ) @@ -250,7 +133,7 @@ async def main_task(): err=None, ) # For macOS: err=None ignores expected stderr from tracer failing to connect to agent (not relevant to this test) -def test_asyncio_start_profiler_from_process_before_starting_loop(): +def test_asyncio_start_profiler_from_process_before_starting_loop() -> None: import asyncio import os import sys @@ -325,6 +208,13 @@ async def main_task(): EXPECTED_FILENAME_BACKGROUND = os.path.basename(background_task_def.__code__.co_filename) EXPECTED_LINE_NO_BACKGROUND = -1 # any line + # Verify specific tasks are in the profile + if sys.version_info >= (3, 11): + EXPECTED_FUNCTION_NAME_TRACKED = f"{my_function.__name__}..{tracked_task_def.__name__}" + else: + EXPECTED_FUNCTION_NAME_TRACKED = tracked_task_def.__name__ + EXPECTED_FILENAME_TRACKED = os.path.basename(tracked_task_def.__code__.co_filename) + pprof_utils.assert_profile_has_sample( profile, samples, @@ -339,15 +229,9 @@ async def main_task(): ), ], ), + print_samples_on_failure=True, ) - # Verify specific tasks are in the profile - if sys.version_info >= (3, 11): - EXPECTED_FUNCTION_NAME_TRACKED = f"{my_function.__name__}..{tracked_task_def.__name__}" - else: - EXPECTED_FUNCTION_NAME_TRACKED = tracked_task_def.__name__ - EXPECTED_FILENAME_TRACKED = os.path.basename(tracked_task_def.__code__.co_filename) - pprof_utils.assert_profile_has_sample( profile, samples, @@ -362,10 +246,11 @@ async def main_task(): ) ], ), + print_samples_on_failure=True, ) -@pytest.mark.xfail(reason="This test fails because there's no way to get the current loop if it's not already running.") +@pytest.mark.xfail(reason="No way to get the current loop if it is set but not running.") @pytest.mark.subprocess( env=dict( DD_PROFILING_OUTPUT_PPROF="/tmp/test_asyncio_start_profiler_from_process_after_creating_loop", @@ -373,7 +258,7 @@ async def main_task(): err=None, ) # For macOS: err=None ignores expected stderr from tracer failing to connect to agent (not relevant to this test) -def test_asyncio_start_profiler_from_process_after_creating_loop(): +def test_asyncio_start_profiler_from_process_after_creating_loop() -> None: import asyncio import os import sys @@ -462,6 +347,7 @@ async def main_task(): ), ], ), + print_samples_on_failure=True, ) # Verify specific tasks are in the profile @@ -486,6 +372,7 @@ async def main_task(): ) ], ), + print_samples_on_failure=True, ) @@ -497,7 +384,7 @@ async def main_task(): err=None, ) # For macOS: err=None ignores expected stderr from tracer failing to connect to agent (not relevant to this test) -def test_asyncio_import_profiler_from_process_after_starting_loop(): +def test_asyncio_import_profiler_from_process_after_starting_loop() -> None: import asyncio import os import sys @@ -587,6 +474,7 @@ async def main_task(): ), ], ), + print_samples_on_failure=True, ) # Verify specific tasks are in the profile @@ -611,6 +499,7 @@ async def main_task(): ) ], ), + print_samples_on_failure=True, ) @@ -620,7 +509,7 @@ async def main_task(): ), err=None, ) -def test_asyncio_start_profiler_from_process_after_task_start(): +def test_asyncio_start_profiler_from_process_after_task_start() -> None: # NOW import profiling modules - this should track the existing loop import asyncio import os @@ -750,7 +639,6 @@ async def main_task(): samples, expected_sample=pprof_utils.StackEvent( thread_name="MainThread", - task_name=t1_name, locations=[ pprof_utils.StackLocation( function_name=EXPECTED_FUNCTION_NAME_TRACKED, @@ -759,7 +647,6 @@ async def main_task(): ) ], ), - print_samples_on_failure=True, ) @@ -769,7 +656,7 @@ async def main_task(): ), err=None, ) -def test_asyncio_import_and_start_profiler_from_process_after_task_start(): +def test_asyncio_import_and_start_profiler_from_process_after_task_start() -> None: import asyncio import os import sys @@ -863,6 +750,7 @@ async def main_task(): ), ], ), + print_samples_on_failure=True, ) # Verify specific tasks are in the profile @@ -877,7 +765,6 @@ async def main_task(): samples, expected_sample=pprof_utils.StackEvent( thread_name="MainThread", - task_name=t1_name, locations=[ pprof_utils.StackLocation( function_name=EXPECTED_FUNCTION_NAME_TRACKED, @@ -886,4 +773,5 @@ async def main_task(): ) ], ), + print_samples_on_failure=True, ) diff --git a/tests/profiling/collector/test_asyncio_weak_links.py b/tests/profiling/collector/test_asyncio_weak_links.py new file mode 100644 index 00000000000..b26648c5013 --- /dev/null +++ b/tests/profiling/collector/test_asyncio_weak_links.py @@ -0,0 +1,99 @@ +import pytest + + +@pytest.mark.subprocess( + env=dict( + DD_PROFILING_OUTPUT_PPROF="/tmp/test_asyncio_weak_links", + ), + err=None, +) +# For macOS: err=None ignores expected stderr from tracer failing to connect to agent (not relevant to this test) +def test_asyncio_weak_links_wall_time() -> None: + import asyncio + import os + + from ddtrace.internal.datadog.profiling import stack + from ddtrace.profiling import profiler + from tests.profiling.collector import pprof_utils + + assert stack.is_available, stack.failure_msg + + async def func_not_awaited() -> None: + await asyncio.sleep(0.5) + + async def func_awaited() -> None: + await asyncio.sleep(1) + + async def parent() -> asyncio.Task: + await asyncio.sleep(0.5) + + t_not_awaited = asyncio.create_task(func_not_awaited(), name="Task-not_awaited") + t_awaited = asyncio.create_task(func_awaited(), name="Task-awaited") + + await t_awaited + + # At this point, we have not awaited t_not_awaited but it should have finished + # before t_awaited as the delay is much shorter. + # Returning it to avoid the warning on unused variable. + return t_not_awaited + + async def main() -> None: + await parent() + + p = profiler.Profiler() + p.start() + + asyncio.run(main()) + + p.stop() + + output_filename = os.environ["DD_PROFILING_OUTPUT_PPROF"] + "." + str(os.getpid()) + + profile = pprof_utils.parse_newest_profile(output_filename) + + samples = pprof_utils.get_samples_with_label_key(profile, "task name") + assert len(samples) > 0 + + def loc(fn: str, file: str = "", line: int = -1) -> pprof_utils.StackLocation: + return pprof_utils.StackLocation( + function_name=fn, + filename=file, + line_no=line, + ) + + # We should see a stack for (Task-1) / parent / (Task-not_awaited) / not_awaited / sleep + # Even though Task-1 does not await Task-not_awaited, the fact that there is a weak (parent - child) link + # means that Task-not_awaited is under Task-1. + pprof_utils.assert_profile_has_sample( + profile, + samples, + expected_sample=pprof_utils.StackEvent( + thread_name="MainThread", + task_name="Task-not_awaited", + locations=[ + loc("sleep"), + loc("func_not_awaited", "test_asyncio_weak_links.py", func_not_awaited.__code__.co_firstlineno + 1), + # loc("Task-not_awaited"), + loc("parent", "test_asyncio_weak_links.py", parent.__code__.co_firstlineno + 6), + # loc("Task-1"), + ], + ), + ) + + # We should see a stack for Task-1 / parent / Task-awaited / awaited / sleep + # That is because Task-1 is awaiting Task-awaited. + pprof_utils.assert_profile_has_sample( + profile, + samples, + expected_sample=pprof_utils.StackEvent( + thread_name="MainThread", + task_name="Task-awaited", + locations=[ + loc("sleep"), + loc("func_awaited", "test_asyncio_weak_links.py", func_awaited.__code__.co_firstlineno + 1), + # loc("Task-awaited"), + loc("parent", "test_asyncio_weak_links.py", parent.__code__.co_firstlineno + 6), + # loc("Task-1"), + ], + ), + ) diff --git a/tests/profiling/collector/test_stack_asyncio_basic.py b/tests/profiling/collector/test_stack_asyncio_basic.py new file mode 100644 index 00000000000..0a25c193d0b --- /dev/null +++ b/tests/profiling/collector/test_stack_asyncio_basic.py @@ -0,0 +1,168 @@ +import pytest + + +@pytest.mark.subprocess( + env=dict( + DD_PROFILING_OUTPUT_PPROF="/tmp/test_stack_asyncio_basic", + ), + err=None, +) +# For macOS: err=None ignores expected stderr from tracer failing to connect to agent (not relevant to this test) +def test_asyncio_basic() -> None: + import asyncio + import os + import time + import uuid + + from ddtrace import ext + from ddtrace.internal.datadog.profiling import stack + from ddtrace.profiling import profiler + from ddtrace.trace import tracer + from tests.profiling.collector import pprof_utils + + assert stack.is_available, stack.failure_msg + + sleep_time = 0.2 + loop_run_time = 3 + + async def stuff() -> None: + start_time = time.time() + while time.time() < start_time + loop_run_time: + await asyncio.sleep(sleep_time) + + async def hello(): + t1 = asyncio.create_task(stuff(), name="sleep 1") + t2 = asyncio.create_task(stuff(), name="sleep 2") + await stuff() + return (t1, t2) + + resource = str(uuid.uuid4()) + span_type = ext.SpanTypes.WEB + + p = profiler.Profiler(tracer=tracer) + p.start() + with tracer.trace("test_asyncio_basic", resource=resource, span_type=span_type) as span: + span_id = span.span_id + local_root_span_id = span._local_root.span_id + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + main_task = loop.create_task(hello(), name="main") + + t1, t2 = loop.run_until_complete(main_task) + p.stop() + + t1_name = t1.get_name() + t2_name = t2.get_name() + + assert t1_name == "sleep 1" + assert t2_name == "sleep 2" + + output_filename = os.environ["DD_PROFILING_OUTPUT_PPROF"] + "." + str(os.getpid()) + + profile = pprof_utils.parse_newest_profile(output_filename) + + samples_with_span_id = pprof_utils.get_samples_with_label_key(profile, "span id") + assert len(samples_with_span_id) > 0 + + # get samples with task_name + samples = pprof_utils.get_samples_with_label_key(profile, "task name") + # The next fails if stack is not properly configured with asyncio task + # tracking via ddtrace.profiling._asyncio + assert len(samples) > 0 + + pprof_utils.assert_profile_has_sample( + profile, + samples, + expected_sample=pprof_utils.StackEvent( + thread_name="MainThread", + task_name="main", + span_id=span_id, + local_root_span_id=local_root_span_id, + locations=[ + pprof_utils.StackLocation( + function_name="hello", + filename="test_stack_asyncio_basic.py", + line_no=hello.__code__.co_firstlineno + 3, + ) + ], + ), + print_samples_on_failure=True, + ) + + # Currently, one of the children Tasks appears as part of the Parent Task. + # This means either for T1 or T2, the task_name label will be set to "main" + # even though they are different Tasks. + # Until this is fixed, we need to assert that either case is happening. + try: + pprof_utils.assert_profile_has_sample( + profile, + samples, + expected_sample=pprof_utils.StackEvent( + thread_name="MainThread", + task_name="main", + span_id=span_id, + local_root_span_id=local_root_span_id, + locations=[ + pprof_utils.StackLocation( + function_name="stuff", + filename="test_stack_asyncio_basic.py", + line_no=stuff.__code__.co_firstlineno + 3, + ), + ], + ), + ) + + pprof_utils.assert_profile_has_sample( + profile, + samples, + expected_sample=pprof_utils.StackEvent( + thread_name="MainThread", + task_name=t2_name, + span_id=span_id, + local_root_span_id=local_root_span_id, + locations=[ + pprof_utils.StackLocation( + function_name="stuff", + filename="test_stack_asyncio_basic.py", + line_no=stuff.__code__.co_firstlineno + 3, + ), + ], + ), + ) + except AssertionError: + pprof_utils.assert_profile_has_sample( + profile, + samples, + expected_sample=pprof_utils.StackEvent( + thread_name="MainThread", + task_name=t1_name, + span_id=span_id, + local_root_span_id=local_root_span_id, + locations=[ + pprof_utils.StackLocation( + function_name="stuff", + filename="test_stack_asyncio_basic.py", + line_no=stuff.__code__.co_firstlineno + 3, + ), + ], + ), + ) + + pprof_utils.assert_profile_has_sample( + profile, + samples, + expected_sample=pprof_utils.StackEvent( + thread_name="MainThread", + task_name="main", + span_id=span_id, + local_root_span_id=local_root_span_id, + locations=[ + pprof_utils.StackLocation( + function_name="stuff", + filename="test_stack_asyncio_basic.py", + line_no=stuff.__code__.co_firstlineno + 3, + ), + ], + ), + )