diff --git a/bpf/common/runtime.h b/bpf/common/runtime.h index 220af5e898..902280524d 100644 --- a/bpf/common/runtime.h +++ b/bpf/common/runtime.h @@ -7,6 +7,7 @@ #include #include +#include #include @@ -18,6 +19,11 @@ static __always_inline u64 extra_runtime_id_with_task_id(const u64 id) { static __always_inline u64 extra_runtime_id() { const u64 id = bpf_get_current_pid_tgid(); - + u64 *context = bpf_map_lookup_elem(&python_current_context, &id); + if (context) { + bpf_dbg_printk( + "extra_runtime_id: LOOKUP python_current_context[host_id=%llx] = %llx", id, *context); + return (u64)(*context); + } return extra_runtime_id_with_task_id(id); } diff --git a/bpf/common/trace_common.h b/bpf/common/trace_common.h index 397e25e94d..47ad69092c 100644 --- a/bpf/common/trace_common.h +++ b/bpf/common/trace_common.h @@ -31,6 +31,8 @@ #include #include #include +#include +#include #include @@ -220,6 +222,41 @@ static __always_inline tp_info_pid_t *find_parent_process_trace(trace_key_t *t_k return NULL; } +static __always_inline tp_info_pid_t *find_python_parent_trace(const trace_key_t *t_key) { + // Up to 5 levels of thread nesting allowed + enum { k_max_depth = 5 }; + trace_key_t key = *t_key; + + for (u8 i = 0; i < k_max_depth; ++i) { + tp_info_pid_t *server_tp = bpf_map_lookup_elem(&server_traces, &key); + + if (server_tp) { + bpf_dbg_printk( + "find_python_parent: FOUND tid=%d extra=%llx", key.p_key.tid, key.extra_id); + return server_tp; + } + + // Not this thread's server request, try Python context chain + u64 *context_ptr = bpf_map_lookup_elem(&python_thread_context, &key); + if (!context_ptr) { + bpf_dbg_printk( + "find_python_parent: MISS tid=%d extra=%llx", key.p_key.tid, key.extra_id); + break; + } + + trace_key_t *p_key = bpf_map_lookup_elem(&python_context_trace, context_ptr); + if (!p_key) { + bpf_dbg_printk( + "find_python_parent: ctx MISS tid=%d ctx=%llx", key.p_key.tid, *context_ptr); + break; + } + + key = *p_key; + } + + return NULL; +} + static __always_inline tp_info_pid_t *find_parent_java_trace(trace_key_t *t_key) { // Up to 3 levels of thread nesting allowed enum { k_max_depth = 3 }; @@ -265,6 +302,12 @@ static __always_inline tp_info_pid_t *find_parent_trace(const pid_connection_inf t_key->p_key.ns, t_key->extra_id); + tp_info_pid_t *python_parent = find_python_parent_trace(t_key); + + if (python_parent) { + return python_parent; + } + tp_info_pid_t *nginx_parent = find_nginx_parent_trace(p_conn, orig_dport); if (nginx_parent) { @@ -272,6 +315,7 @@ static __always_inline tp_info_pid_t *find_parent_trace(const pid_connection_inf } tp_info_pid_t *puma_parent = find_puma_parent_trace(pid_tgid); + if (puma_parent) { return puma_parent; } diff --git a/bpf/generictracer/generictracer.c b/bpf/generictracer/generictracer.c index 0609cbad5a..4f0ddcbfed 100644 --- a/bpf/generictracer/generictracer.c +++ b/bpf/generictracer/generictracer.c @@ -9,5 +9,6 @@ #include "nodejs.c" #include "java_tls.c" #include "ruby.c" +#include "python.c" char __license[] SEC("license") = "Dual MIT/GPL"; diff --git a/bpf/generictracer/python.c b/bpf/generictracer/python.c new file mode 100644 index 0000000000..cd8df5e626 --- /dev/null +++ b/bpf/generictracer/python.c @@ -0,0 +1,61 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build obi_bpf_ignore + +#include +#include + +#include + +#include + +static __always_inline trace_key_t make_trace_key(void *context) { + trace_key_t t_key = {}; + task_tid(&t_key.p_key); + if (context) { + t_key.extra_id = (u64)context; + } else { + t_key.extra_id = extra_runtime_id(); + } + return t_key; +} + +SEC("uprobe/libpython3.so:context_run") +int obi_uprobe_context_run(struct pt_regs *ctx) { + u64 id = bpf_get_current_pid_tgid(); + + if (!valid_pid(id)) { + return 0; + } + + void *context = (void *)PT_REGS_PARM1(ctx); + const trace_key_t t_key = make_trace_key(context); + + bpf_dbg_printk("context_run: tid=%d ctx=%llx", t_key.p_key.tid, context); + bpf_map_update_elem(&python_thread_context, &t_key, &context, BPF_ANY); + bpf_map_update_elem(&python_current_context, &id, &context, BPF_ANY); + return 0; +} + +SEC("uprobe/libpython3.so:context_new_from_vars_ret") +int obi_uprobe_context_new_from_vars_ret(struct pt_regs *ctx) { + u64 id = bpf_get_current_pid_tgid(); + + if (!valid_pid(id)) { + return 0; + } + + void *context = (void *)PT_REGS_RC(ctx); + + if (!context) { + return 0; + } + + const trace_key_t t_key = make_trace_key(0); + + bpf_dbg_printk( + "context_new: tid=%d ctx=%llx extra=%llx", t_key.p_key.tid, context, t_key.extra_id); + bpf_map_update_elem(&python_context_trace, &context, &t_key, BPF_ANY); + return 0; +} diff --git a/bpf/maps/python_context_trace.h b/bpf/maps/python_context_trace.h new file mode 100644 index 0000000000..7fd9112047 --- /dev/null +++ b/bpf/maps/python_context_trace.h @@ -0,0 +1,21 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include +#include + +#include +#include +#include + +// Maps a Python context pointer to the trace_key_t of the server span that +// was active when the context was created. +struct { + __uint(type, BPF_MAP_TYPE_LRU_HASH); + __type(key, u64); // Python context pointer (PyObject*) + __type(value, trace_key_t); // Server parent trace key + __uint(max_entries, MAX_CONCURRENT_REQUESTS); + __uint(pinning, OBI_PIN_INTERNAL); +} python_context_trace SEC(".maps"); diff --git a/bpf/maps/python_current_context.h b/bpf/maps/python_current_context.h new file mode 100644 index 0000000000..3dd0b1cced --- /dev/null +++ b/bpf/maps/python_current_context.h @@ -0,0 +1,20 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include +#include + +#include +#include + +// Host thread mapping: associates pid/tid with the Python context pointer +// active on that OS thread. +struct { + __uint(type, BPF_MAP_TYPE_LRU_HASH); + __type(key, u64); // thread id + __type(value, u64); // Python context pointer (PyObject*) + __uint(max_entries, MAX_CONCURRENT_REQUESTS); + __uint(pinning, OBI_PIN_INTERNAL); +} python_current_context SEC(".maps"); diff --git a/bpf/maps/python_thread_context.h b/bpf/maps/python_thread_context.h new file mode 100644 index 0000000000..cfc2d580b5 --- /dev/null +++ b/bpf/maps/python_thread_context.h @@ -0,0 +1,21 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include +#include + +#include +#include +#include + +// Maps thread ID (trace_key_t) to the Python context pointer currently +// running on that thread. +struct { + __uint(type, BPF_MAP_TYPE_LRU_HASH); + __type(key, trace_key_t); + __type(value, u64); // Python context pointer (PyObject*) + __uint(max_entries, MAX_CONCURRENT_REQUESTS); + __uint(pinning, OBI_PIN_INTERNAL); +} python_thread_context SEC(".maps"); diff --git a/internal/test/integration/components/pythonasync/Dockerfile b/internal/test/integration/components/pythonasync/Dockerfile new file mode 100644 index 0000000000..d722e9d25c --- /dev/null +++ b/internal/test/integration/components/pythonasync/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.12 +EXPOSE 8391 + +WORKDIR /app + +COPY internal/test/integration/components/pythonasync/requirements.txt . +RUN pip install -r requirements.txt + +COPY internal/test/integration/components/pythonasync . + +CMD ["python", "async_context_test.py"] diff --git a/internal/test/integration/components/pythonasync/async_context_test.py b/internal/test/integration/components/pythonasync/async_context_test.py new file mode 100644 index 0000000000..4940ce652d --- /dev/null +++ b/internal/test/integration/components/pythonasync/async_context_test.py @@ -0,0 +1,84 @@ +import asyncio +from aiohttp import web +import httpx +import requests +import os + +BACKEND_URL = os.environ.get("BACKEND_URL", "http://localhost:8085") + + +async def startup(app: web.Application): + app["http_client"] = httpx.AsyncClient(timeout=30.0) + + +async def shutdown(app: web.Application): + await app["http_client"].aclose() + + +async def test_sequential(request: web.Request) -> web.Response: + req_id = int(request.match_info["req_id"]) + http_client = request.app["http_client"] + r1 = await http_client.get(f"{BACKEND_URL}/") + r2 = await http_client.get(f"{BACKEND_URL}/") + r3 = await http_client.get(f"{BACKEND_URL}/") + return web.json_response( + {"id": req_id, "calls": 3, "status_codes": [r1.status_code, r2.status_code, r3.status_code]} + ) + + +async def health_check(request: web.Request) -> web.Response: + return web.json_response({"status": "ok"}) + + +async def test_to_thread(request: web.Request) -> web.Response: + req_id = int(request.match_info["req_id"]) + + def blocking_http_call(url: str): + response = requests.get(url, timeout=30) + return response.status_code + + r1 = await asyncio.to_thread(blocking_http_call, f"{BACKEND_URL}/") + r2 = await asyncio.to_thread(blocking_http_call, f"{BACKEND_URL}/") + return web.json_response({"id": req_id, "calls": 2, "status_codes": [r1, r2]}) + + +async def test_parallel(request: web.Request) -> web.Response: + req_id = int(request.match_info["req_id"]) + http_client = request.app["http_client"] + r1, r2, r3 = await asyncio.gather( + http_client.get(f"{BACKEND_URL}/"), + http_client.get(f"{BACKEND_URL}/"), + http_client.get(f"{BACKEND_URL}/") + ) + return web.json_response( + {"id": req_id, "calls": 3, "status_codes": [r1.status_code, r2.status_code, r3.status_code]} + ) + + +async def test_create_task(request: web.Request) -> web.Response: + req_id = int(request.match_info["req_id"]) + http_client = request.app["http_client"] + + async def call(i: int): + r = await http_client.get(f"{BACKEND_URL}/?i={i}") + return r.status_code + + tasks = [asyncio.create_task(call(i)) for i in range(3)] + codes = await asyncio.gather(*tasks) + return web.json_response({"id": req_id, "calls": len(tasks), "status_codes": codes}) + + +def create_app() -> web.Application: + app = web.Application() + app.on_startup.append(startup) + app.on_cleanup.append(shutdown) + app.router.add_get("/sequential/{req_id}", test_sequential) + app.router.add_get("/health", health_check) + app.router.add_get("/to-thread/{req_id}", test_to_thread) + app.router.add_get("/parallel/{req_id}", test_parallel) + app.router.add_get("/create-task/{req_id}", test_create_task) + return app + + +if __name__ == "__main__": + web.run_app(create_app(), host="0.0.0.0", port=8391) diff --git a/internal/test/integration/components/pythonasync/requirements.txt b/internal/test/integration/components/pythonasync/requirements.txt new file mode 100644 index 0000000000..5a98aef4d6 --- /dev/null +++ b/internal/test/integration/components/pythonasync/requirements.txt @@ -0,0 +1,3 @@ +aiohttp==3.9.5 +httpx==0.25.2 +requests==2.31.0 diff --git a/internal/test/integration/docker-compose-python-async.yml b/internal/test/integration/docker-compose-python-async.yml new file mode 100644 index 0000000000..dc4d3938f6 --- /dev/null +++ b/internal/test/integration/docker-compose-python-async.yml @@ -0,0 +1,111 @@ +version: "3.8" + +services: + testserver: + build: + context: ../../.. + dockerfile: internal/test/integration/components/testserver/Dockerfile + image: hatest-testserver + ports: + - "8085:8080" + + pythonasync: + build: + context: ../../.. + dockerfile: internal/test/integration/components/pythonasync/Dockerfile + image: hatest-pythonasync + ports: + - "8391:8391" + environment: + LOG_LEVEL: DEBUG + BACKEND_URL: "http://testserver:8080" + depends_on: + - testserver + + obi: + build: + context: ../../.. + dockerfile: ./internal/test/integration/components/ebpf-instrument/Dockerfile + command: + - --config=/configs/obi-config.yml + volumes: + - ./configs/:/configs + - ./system/sys/kernel/security${SECURITY_CONFIG_SUFFIX}:/sys/kernel/security + - ../../../testoutput:/coverage + - ../../../testoutput/run-python-async:/var/run/beyla + image: hatest-obi + privileged: true + pid: "host" + environment: + GOCOVERDIR: "/coverage" + OTEL_EBPF_TRACE_PRINTER: "text" + OTEL_EBPF_OPEN_PORT: "8391,8080" + OTEL_EBPF_METRICS_FEATURES: "application,application_span_otel,application_process,application_service_graph,ebpf,application_host" + OTEL_EBPF_PROMETHEUS_FEATURES: "application,application_span_otel,application_process,application_service_graph,ebpf,application_host" + OTEL_EBPF_DISCOVERY_POLL_INTERVAL: 500ms + OTEL_EBPF_EXECUTABLE_NAME: "" + OTEL_EBPF_SERVICE_NAMESPACE: "integration-test" + OTEL_EBPF_METRICS_INTERVAL: "10ms" + OTEL_EBPF_OTLP_TRACES_BATCH_TIMEOUT: "1ms" + OTEL_EBPF_BPF_BATCH_TIMEOUT: "10ms" + OTEL_EBPF_LOG_LEVEL: "DEBUG" + OTEL_EBPF_BPF_DEBUG: "TRUE" + OTEL_EBPF_INTERNAL_METRICS_PROMETHEUS_PORT: 8999 + OTEL_EBPF_PROCESSES_INTERVAL: "100ms" + OTEL_EBPF_HOSTNAME: "beyla" + OTEL_EBPF_PROMETHEUS_EXTRA_SPAN_RESOURCE_ATTRIBUTES: "service.version" + OTEL_EBPF_EXTRA_SPAN_RESOURCE_ATTRIBUTES: "service.version" + ports: + - "8999:8999" + depends_on: + pythonasync: + condition: service_started + testserver: + condition: service_started + jaeger: + condition: service_started + + otelcol: + image: otel/opentelemetry-collector-contrib:0.118.0@sha256:3f6274fe609da4f9964fbc2ef36b83d08d47964fbb11629251393590d3924072 + container_name: otel-col + deploy: + resources: + limits: + memory: 125M + restart: unless-stopped + command: ["--config=/etc/otelcol-config/otelcol-config.yml"] + volumes: + - ./configs/:/etc/otelcol-config + ports: + - "4317" + - "4318" + - "9464" + - "8888" + depends_on: + obi: + condition: service_started + prometheus: + condition: service_started + + prometheus: + image: quay.io/prometheus/prometheus:v2.55.1@sha256:2659f4c2ebb718e7695cb9b25ffa7d6be64db013daba13e05c875451cf51b0d3 + container_name: prometheus + command: + - --config.file=/etc/prometheus/prometheus-config.yml + - --web.enable-lifecycle + - --enable-feature=exemplar-storage + - --web.route-prefix=/ + volumes: + - ./configs/:/etc/prometheus + ports: + - "9090:9090" + + jaeger: + image: jaegertracing/all-in-one:1.60@sha256:4fd2d70fa347d6a47e79fcb06b1c177e6079f92cba88b083153d56263082135e + ports: + - "16686:16686" + - "4317" + - "4318" + environment: + - COLLECTOR_OTLP_ENABLED=true + - LOG_LEVEL=debug diff --git a/internal/test/integration/suites_test.go b/internal/test/integration/suites_test.go index 9f63970e52..d03cbb1e2b 100644 --- a/internal/test/integration/suites_test.go +++ b/internal/test/integration/suites_test.go @@ -464,6 +464,17 @@ func TestSuite_JavaKafkaLargeBuffer(t *testing.T) { require.NoError(t, compose.Close()) } +func TestSuite_PythonAsync(t *testing.T) { + compose, err := docker.ComposeSuite("docker-compose-python-async.yml", path.Join(pathOutput, "test-suite-python-async.log")) + require.NoError(t, err) + require.NoError(t, compose.Up()) + t.Run("Python Async Sequential", testPythonAsyncSequential) + t.Run("Python Async To Thread", testPythonAsyncToThread) + t.Run("Python Async Parallel", testPythonAsyncParallel) + t.Run("Python Async Create Task", testPythonAsyncCreateTask) + require.NoError(t, compose.Close()) +} + func TestSuite_PythonRedis(t *testing.T) { compose, err := docker.ComposeSuite("docker-compose-python-redis.yml", path.Join(pathOutput, "test-suite-python-redis.log")) require.NoError(t, err) diff --git a/internal/test/integration/traces_test.go b/internal/test/integration/traces_test.go index eb2186abf8..59a8cc9faa 100644 --- a/internal/test/integration/traces_test.go +++ b/internal/test/integration/traces_test.go @@ -6,6 +6,8 @@ package integration import ( "fmt" "net/http" + "net/url" + "strconv" "strings" "testing" "time" @@ -1523,3 +1525,96 @@ func testHTTPTracesNestedNodeJSLargeHTTPS(t *testing.T) { // We must see two children require.Len(t, children, 2) } + +func testPythonAsyncEndpoint(t *testing.T, endpoint string) { + waitForTestComponentsSub(t, "http://localhost:8391", "/health") + + encodedEndpoint := url.PathEscape(endpoint) + + for i := 1; i <= 4; i++ { + go ti.DoHTTPGet(t, "http://localhost:8391"+endpoint+strconv.Itoa(i), 200) + } + + for i := 1; i <= 4; i++ { + slug := strconv.Itoa(i) + urlPath := endpoint + slug + var trace jaeger.Trace + test.Eventually(t, testTimeout, func(t require.TestingT) { + resp, err := http.Get(jaegerQueryURL + "?service=python3.12&operation=GET%20" + encodedEndpoint + slug) + require.NoError(t, err) + if resp == nil { + return + } + require.Equal(t, http.StatusOK, resp.StatusCode) + var tq jaeger.TracesQuery + require.NoError(t, json.NewDecoder(resp.Body).Decode(&tq)) + traces := tq.FindBySpan(jaeger.Tag{Key: "url.path", Type: "string", Value: urlPath}) + require.GreaterOrEqual(t, len(traces), 1) + trace = traces[0] + + res := trace.FindByOperationName("GET "+urlPath, "server") + require.GreaterOrEqual(t, len(res), 1) + server := res[0] + require.NotEmpty(t, server.TraceID) + require.NotEmpty(t, server.SpanID) + + sd := server.Diff( + jaeger.Tag{Key: "http.request.method", Type: "string", Value: "GET"}, + jaeger.Tag{Key: "http.response.status_code", Type: "int64", Value: float64(200)}, + jaeger.Tag{Key: "url.path", Type: "string", Value: urlPath}, + jaeger.Tag{Key: "server.port", Type: "int64", Value: float64(8391)}, + jaeger.Tag{Key: "span.kind", Type: "string", Value: "server"}, + ) + assert.Empty(t, sd, sd.String()) + + res = trace.FindByOperationName("GET /", "client") + require.GreaterOrEqual(t, len(res), 1) + client := res[0] + require.NotEmpty(t, client.TraceID) + require.Equal(t, server.TraceID, client.TraceID) + require.NotEmpty(t, client.SpanID) + + sd = client.Diff( + jaeger.Tag{Key: "http.request.method", Type: "string", Value: "GET"}, + jaeger.Tag{Key: "http.response.status_code", Type: "int64", Value: float64(200)}, + jaeger.Tag{Key: "server.port", Type: "int64", Value: float64(8080)}, + jaeger.Tag{Key: "span.kind", Type: "string", Value: "client"}, + ) + assert.Empty(t, sd, sd.String()) + + res = trace.FindByOperationName("GET /", "server") + require.GreaterOrEqual(t, len(res), 1) + for _, ts := range res { + require.Equal(t, server.TraceID, ts.TraceID) + parent, ok := trace.ParentOf(&ts) + require.True(t, ok) + require.Equal(t, server.TraceID, parent.TraceID) + + sd = ts.Diff( + jaeger.Tag{Key: "http.request.method", Type: "string", Value: "GET"}, + jaeger.Tag{Key: "http.response.status_code", Type: "int64", Value: float64(200)}, + jaeger.Tag{Key: "url.path", Type: "string", Value: "/"}, + jaeger.Tag{Key: "server.port", Type: "int64", Value: float64(8080)}, + jaeger.Tag{Key: "span.kind", Type: "string", Value: "server"}, + ) + assert.Empty(t, sd, sd.String()) + } + }, test.Interval(100*time.Millisecond)) + } +} + +func testPythonAsyncSequential(t *testing.T) { + testPythonAsyncEndpoint(t, "/sequential/") +} + +func testPythonAsyncToThread(t *testing.T) { + testPythonAsyncEndpoint(t, "/to-thread/") +} + +func testPythonAsyncParallel(t *testing.T) { + testPythonAsyncEndpoint(t, "/parallel/") +} + +func testPythonAsyncCreateTask(t *testing.T) { + testPythonAsyncEndpoint(t, "/create-task/") +} diff --git a/pkg/internal/ebpf/generictracer/generictracer.go b/pkg/internal/ebpf/generictracer/generictracer.go index 9fbc4caf8d..874524fafc 100644 --- a/pkg/internal/ebpf/generictracer/generictracer.go +++ b/pkg/internal/ebpf/generictracer/generictracer.go @@ -418,6 +418,20 @@ func (p *Tracer) UProbes() map[string]map[string][]*ebpfcommon.ProbeDesc { Start: p.bpfObjects.ObiRbObjCallInitKw, }}, }, + "libpython3.": { // python asyncio (from python 3.9) + "context_run": {{ + Required: false, + Start: p.bpfObjects.ObiUprobeContextRun, + }}, + "PyContext_CopyCurrent": {{ + Required: false, + End: p.bpfObjects.ObiUprobeContextNewFromVarsRet, + }}, + "context_new_from_vars": {{ // In Docker, PyContext_CopyCurrent has tail recursion optimization, so we need this function instead + Required: false, + End: p.bpfObjects.ObiUprobeContextNewFromVarsRet, + }}, + }, } }