-
Notifications
You must be signed in to change notification settings - Fork 129
Add support for python asyncio #966
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -7,6 +7,7 @@ | |||||
| #include <bpfcore/bpf_helpers.h> | ||||||
|
|
||||||
| #include <maps/active_unix_socks.h> | ||||||
| #include <maps/python_current_context.h> | ||||||
|
|
||||||
| #include <pid/pid_helpers.h> | ||||||
|
|
||||||
|
|
@@ -18,6 +19,11 @@ static __always_inline u64 extra_runtime_id_with_task_id(const u64 id) { | |||||
|
|
||||||
| static __always_inline u64 extra_runtime_id() { | ||||||
| const u64 id = bpf_get_current_pid_tgid(); | ||||||
|
|
||||||
| u64 *context = bpf_map_lookup_elem(&python_current_context, &id); | ||||||
| if (context) { | ||||||
| bpf_dbg_printk( | ||||||
| "extra_runtime_id: LOOKUP python_current_context[host_id=%llx] = %llx", id, *context); | ||||||
| return (u64)(*context); | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| } | ||||||
| return extra_runtime_id_with_task_id(id); | ||||||
| } | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,6 +31,8 @@ | |
| #include <maps/server_traces.h> | ||
| #include <maps/tp_info_mem.h> | ||
| #include <maps/tp_char_buf_mem.h> | ||
| #include <maps/python_thread_context.h> | ||
| #include <maps/python_context_trace.h> | ||
|
Comment on lines
+34
to
+35
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: includes are ordered alphabetically (at least in a group) |
||
|
|
||
| #include <generictracer/types/puma_task_id.h> | ||
|
|
||
|
|
@@ -220,6 +222,41 @@ static __always_inline tp_info_pid_t *find_parent_process_trace(trace_key_t *t_k | |
| return NULL; | ||
| } | ||
|
|
||
| static __always_inline tp_info_pid_t *find_python_parent_trace(const trace_key_t *t_key) { | ||
| // Up to 5 levels of thread nesting allowed | ||
| enum { k_max_depth = 5 }; | ||
| trace_key_t key = *t_key; | ||
|
|
||
| for (u8 i = 0; i < k_max_depth; ++i) { | ||
| tp_info_pid_t *server_tp = bpf_map_lookup_elem(&server_traces, &key); | ||
|
|
||
| if (server_tp) { | ||
| bpf_dbg_printk( | ||
| "find_python_parent: FOUND tid=%d extra=%llx", key.p_key.tid, key.extra_id); | ||
| return server_tp; | ||
| } | ||
|
|
||
| // Not this thread's server request, try Python context chain | ||
| u64 *context_ptr = bpf_map_lookup_elem(&python_thread_context, &key); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok so here instead of |
||
| if (!context_ptr) { | ||
| bpf_dbg_printk( | ||
| "find_python_parent: MISS tid=%d extra=%llx", key.p_key.tid, key.extra_id); | ||
| break; | ||
| } | ||
|
|
||
| trace_key_t *p_key = bpf_map_lookup_elem(&python_context_trace, context_ptr); | ||
| if (!p_key) { | ||
| bpf_dbg_printk( | ||
| "find_python_parent: ctx MISS tid=%d ctx=%llx", key.p_key.tid, *context_ptr); | ||
| break; | ||
| } | ||
|
|
||
| key = *p_key; | ||
| } | ||
|
|
||
| return NULL; | ||
| } | ||
|
|
||
| static __always_inline tp_info_pid_t *find_parent_java_trace(trace_key_t *t_key) { | ||
| // Up to 3 levels of thread nesting allowed | ||
| enum { k_max_depth = 3 }; | ||
|
|
@@ -265,13 +302,20 @@ static __always_inline tp_info_pid_t *find_parent_trace(const pid_connection_inf | |
| t_key->p_key.ns, | ||
| t_key->extra_id); | ||
|
|
||
| tp_info_pid_t *python_parent = find_python_parent_trace(t_key); | ||
|
|
||
| if (python_parent) { | ||
|
marctc marked this conversation as resolved.
|
||
| return python_parent; | ||
| } | ||
|
|
||
| tp_info_pid_t *nginx_parent = find_nginx_parent_trace(p_conn, orig_dport); | ||
|
|
||
| if (nginx_parent) { | ||
| return nginx_parent; | ||
| } | ||
|
|
||
| tp_info_pid_t *puma_parent = find_puma_parent_trace(pid_tgid); | ||
|
|
||
| if (puma_parent) { | ||
| return puma_parent; | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,61 @@ | ||||||
| // Copyright The OpenTelemetry Authors | ||||||
| // SPDX-License-Identifier: Apache-2.0 | ||||||
|
|
||||||
| //go:build obi_bpf_ignore | ||||||
|
|
||||||
| #include <bpfcore/vmlinux.h> | ||||||
| #include <bpfcore/bpf_helpers.h> | ||||||
|
|
||||||
| #include <common/trace_common.h> | ||||||
|
|
||||||
| #include <pid/pid.h> | ||||||
|
|
||||||
| static __always_inline trace_key_t make_trace_key(void *context) { | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| trace_key_t t_key = {}; | ||||||
| task_tid(&t_key.p_key); | ||||||
| if (context) { | ||||||
| t_key.extra_id = (u64)context; | ||||||
| } else { | ||||||
| t_key.extra_id = extra_runtime_id(); | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so with regards to my comments above, here you can lookup |
||||||
| } | ||||||
| return t_key; | ||||||
| } | ||||||
|
|
||||||
| SEC("uprobe/libpython3.so:context_run") | ||||||
| int obi_uprobe_context_run(struct pt_regs *ctx) { | ||||||
| u64 id = bpf_get_current_pid_tgid(); | ||||||
|
|
||||||
| if (!valid_pid(id)) { | ||||||
| return 0; | ||||||
| } | ||||||
|
|
||||||
| void *context = (void *)PT_REGS_PARM1(ctx); | ||||||
| const trace_key_t t_key = make_trace_key(context); | ||||||
|
|
||||||
| bpf_dbg_printk("context_run: tid=%d ctx=%llx", t_key.p_key.tid, context); | ||||||
| bpf_map_update_elem(&python_thread_context, &t_key, &context, BPF_ANY); | ||||||
| bpf_map_update_elem(&python_current_context, &id, &context, BPF_ANY); | ||||||
|
Comment on lines
+36
to
+37
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we really need 2 maps here? But then t_key is basically the same information (the thread/task id encoded differently) mapped to a context (and more confusingly the I hope that makes sense, perhaps I am misunderstanding it. |
||||||
| return 0; | ||||||
| } | ||||||
|
|
||||||
| SEC("uprobe/libpython3.so:context_new_from_vars_ret") | ||||||
| int obi_uprobe_context_new_from_vars_ret(struct pt_regs *ctx) { | ||||||
| u64 id = bpf_get_current_pid_tgid(); | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
|
||||||
| if (!valid_pid(id)) { | ||||||
| return 0; | ||||||
| } | ||||||
|
|
||||||
| void *context = (void *)PT_REGS_RC(ctx); | ||||||
|
|
||||||
| if (!context) { | ||||||
| return 0; | ||||||
| } | ||||||
|
|
||||||
| const trace_key_t t_key = make_trace_key(0); | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I understand this correctly, this is where you associate this context with its parent? If so, couldn't you do something like:
|
||||||
|
|
||||||
| bpf_dbg_printk( | ||||||
| "context_new: tid=%d ctx=%llx extra=%llx", t_key.p_key.tid, context, t_key.extra_id); | ||||||
| bpf_map_update_elem(&python_context_trace, &context, &t_key, BPF_ANY); | ||||||
| return 0; | ||||||
| } | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| // Copyright The OpenTelemetry Authors | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| #pragma once | ||
|
|
||
| #include <bpfcore/vmlinux.h> | ||
| #include <bpfcore/bpf_helpers.h> | ||
|
|
||
| #include <common/trace_key.h> | ||
| #include <common/pin_internal.h> | ||
| #include <common/map_sizing.h> | ||
|
|
||
| // Maps a Python context pointer to the trace_key_t of the server span that | ||
| // was active when the context was created. | ||
| struct { | ||
| __uint(type, BPF_MAP_TYPE_LRU_HASH); | ||
| __type(key, u64); // Python context pointer (PyObject*) | ||
| __type(value, trace_key_t); // Server parent trace key | ||
| __uint(max_entries, MAX_CONCURRENT_REQUESTS); | ||
| __uint(pinning, OBI_PIN_INTERNAL); | ||
| } python_context_trace SEC(".maps"); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| // Copyright The OpenTelemetry Authors | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| #pragma once | ||
|
|
||
| #include <bpfcore/vmlinux.h> | ||
| #include <bpfcore/bpf_helpers.h> | ||
|
|
||
| #include <common/pin_internal.h> | ||
| #include <common/map_sizing.h> | ||
|
|
||
| // Host thread mapping: associates pid/tid with the Python context pointer | ||
| // active on that OS thread. | ||
| struct { | ||
| __uint(type, BPF_MAP_TYPE_LRU_HASH); | ||
| __type(key, u64); // thread id | ||
| __type(value, u64); // Python context pointer (PyObject*) | ||
| __uint(max_entries, MAX_CONCURRENT_REQUESTS); | ||
| __uint(pinning, OBI_PIN_INTERNAL); | ||
| } python_current_context SEC(".maps"); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| // Copyright The OpenTelemetry Authors | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| #pragma once | ||
|
|
||
| #include <bpfcore/vmlinux.h> | ||
| #include <bpfcore/bpf_helpers.h> | ||
|
|
||
| #include <common/pin_internal.h> | ||
| #include <common/trace_key.h> | ||
| #include <common/map_sizing.h> | ||
|
|
||
| // Maps thread ID (trace_key_t) to the Python context pointer currently | ||
| // running on that thread. | ||
| struct { | ||
| __uint(type, BPF_MAP_TYPE_LRU_HASH); | ||
| __type(key, trace_key_t); | ||
| __type(value, u64); // Python context pointer (PyObject*) | ||
| __uint(max_entries, MAX_CONCURRENT_REQUESTS); | ||
| __uint(pinning, OBI_PIN_INTERNAL); | ||
| } python_thread_context SEC(".maps"); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| FROM python:3.12 | ||
| EXPOSE 8391 | ||
|
|
||
| WORKDIR /app | ||
|
|
||
| COPY internal/test/integration/components/pythonasync/requirements.txt . | ||
| RUN pip install -r requirements.txt | ||
|
|
||
| COPY internal/test/integration/components/pythonasync . | ||
|
|
||
| CMD ["python", "async_context_test.py"] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,84 @@ | ||
| import asyncio | ||
| from aiohttp import web | ||
| import httpx | ||
| import requests | ||
| import os | ||
|
|
||
| BACKEND_URL = os.environ.get("BACKEND_URL", "http://localhost:8085") | ||
|
|
||
|
|
||
| async def startup(app: web.Application): | ||
| app["http_client"] = httpx.AsyncClient(timeout=30.0) | ||
|
|
||
|
|
||
| async def shutdown(app: web.Application): | ||
| await app["http_client"].aclose() | ||
|
|
||
|
|
||
| async def test_sequential(request: web.Request) -> web.Response: | ||
| req_id = int(request.match_info["req_id"]) | ||
| http_client = request.app["http_client"] | ||
| r1 = await http_client.get(f"{BACKEND_URL}/") | ||
| r2 = await http_client.get(f"{BACKEND_URL}/") | ||
| r3 = await http_client.get(f"{BACKEND_URL}/") | ||
| return web.json_response( | ||
| {"id": req_id, "calls": 3, "status_codes": [r1.status_code, r2.status_code, r3.status_code]} | ||
| ) | ||
|
|
||
|
|
||
| async def health_check(request: web.Request) -> web.Response: | ||
| return web.json_response({"status": "ok"}) | ||
|
|
||
|
|
||
| async def test_to_thread(request: web.Request) -> web.Response: | ||
| req_id = int(request.match_info["req_id"]) | ||
|
|
||
| def blocking_http_call(url: str): | ||
| response = requests.get(url, timeout=30) | ||
| return response.status_code | ||
|
|
||
| r1 = await asyncio.to_thread(blocking_http_call, f"{BACKEND_URL}/") | ||
| r2 = await asyncio.to_thread(blocking_http_call, f"{BACKEND_URL}/") | ||
| return web.json_response({"id": req_id, "calls": 2, "status_codes": [r1, r2]}) | ||
|
|
||
|
|
||
| async def test_parallel(request: web.Request) -> web.Response: | ||
| req_id = int(request.match_info["req_id"]) | ||
| http_client = request.app["http_client"] | ||
| r1, r2, r3 = await asyncio.gather( | ||
| http_client.get(f"{BACKEND_URL}/"), | ||
| http_client.get(f"{BACKEND_URL}/"), | ||
| http_client.get(f"{BACKEND_URL}/") | ||
| ) | ||
| return web.json_response( | ||
| {"id": req_id, "calls": 3, "status_codes": [r1.status_code, r2.status_code, r3.status_code]} | ||
| ) | ||
|
|
||
|
|
||
| async def test_create_task(request: web.Request) -> web.Response: | ||
| req_id = int(request.match_info["req_id"]) | ||
| http_client = request.app["http_client"] | ||
|
|
||
| async def call(i: int): | ||
| r = await http_client.get(f"{BACKEND_URL}/?i={i}") | ||
| return r.status_code | ||
|
|
||
| tasks = [asyncio.create_task(call(i)) for i in range(3)] | ||
| codes = await asyncio.gather(*tasks) | ||
| return web.json_response({"id": req_id, "calls": len(tasks), "status_codes": codes}) | ||
|
|
||
|
|
||
| def create_app() -> web.Application: | ||
| app = web.Application() | ||
| app.on_startup.append(startup) | ||
| app.on_cleanup.append(shutdown) | ||
| app.router.add_get("/sequential/{req_id}", test_sequential) | ||
| app.router.add_get("/health", health_check) | ||
| app.router.add_get("/to-thread/{req_id}", test_to_thread) | ||
| app.router.add_get("/parallel/{req_id}", test_parallel) | ||
| app.router.add_get("/create-task/{req_id}", test_create_task) | ||
| return app | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
|
marctc marked this conversation as resolved.
|
||
| web.run_app(create_app(), host="0.0.0.0", port=8391) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| aiohttp==3.9.5 | ||
| httpx==0.25.2 | ||
| requests==2.31.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reckon this is only relevant for Python, so I'd move this into
make_trace_keyinpython.cand leave this alone.Rationale:
python_current_context