diff --git a/bpf/generictracer/nodejs.c b/bpf/generictracer/nodejs.c index 4a9b4c460..dde84a531 100644 --- a/bpf/generictracer/nodejs.c +++ b/bpf/generictracer/nodejs.c @@ -8,36 +8,50 @@ #include #include +#include #include +#include #include -SEC("uprobe/node:uv_fs_access") -int BPF_KPROBE(obi_uv_fs_access, void *loop, void *req, const char *path) { - (void)ctx; - (void)loop; - (void)req; +#include - // the obi nodejs agent (fdextractor.js) passes the file descriptor pair - // to the ebpf layer by invoking uv_fs_access() with an invalid path: - // /dev/null/obi/ where each fd is a left-zero-padded 4 digit - // number - static const char prefix[] = "/dev/null/obi"; - static const u8 prefix_size = sizeof(prefix) - 1; +enum { + k_delim_offset = 13, + k_fd1_offset = 14, + k_fd2_offset = 18, + k_ctx_fd_offset = 18, + k_max_fd_digits = 4 +}; - char buf[] = "/dev/null/obi/00000000"; +static __always_inline int handle_async_switch(char *buf, const u64 pid_tgid) { + u32 fd = 0; + for (u8 i = 0; i < k_max_fd_digits; ++i) { + fd *= 10; + fd += buf[k_ctx_fd_offset + i] - '0'; + } - enum { k_fd1_offset = 14, k_fd2_offset = 18, k_max_fd_digits = 4 }; + bpf_dbg_printk("nodejs_async_switch: %s, pid_tgid = %llx, fd = %u", buf, pid_tgid, fd); - if (bpf_probe_read_user(buf, sizeof(buf), path) != 0) { + const fd_key fkey = {.pid_tgid = pid_tgid, .fd = (s32)fd}; + const connection_info_t *conn = bpf_map_lookup_elem(&fd_to_connection, &fkey); + if (!conn) { + obi_ctx__del(pid_tgid); return 0; } - if (obi_bpf_memcmp(prefix, buf, prefix_size) != 0) { - return 0; + const tp_info_pid_t *tp = trace_info_for_connection(conn, TRACE_TYPE_SERVER); + if (tp && tp->valid) { + obi_ctx__set(pid_tgid, &tp->tp); + } else { + obi_ctx__del(pid_tgid); } + return 0; +} + +static __always_inline int handle_fd_correlation(char *buf, const u64 pid_tgid) { u32 fd1 = 0; u32 fd2 = 0; @@ -48,12 +62,57 @@ int BPF_KPROBE(obi_uv_fs_access, void *loop, void *req, const char *path) { fd2 += buf[k_fd2_offset + i] - '0'; } - bpf_dbg_printk("nodejs_correlation: %s, fd1 = %u, fd2 = %u", buf, fd1, fd2); + bpf_dbg_printk("nodejs_fd_correlation: %s, fd1 = %u, fd2 = %u", buf, fd1, fd2); - const u64 pid_tgid = bpf_get_current_pid_tgid(); const u64 key = (pid_tgid << 32) | fd2; bpf_map_update_elem(&nodejs_fd_map, &key, &fd1, BPF_ANY); return 0; } + +SEC("uprobe/node:uv_fs_access") +int BPF_KPROBE(obi_uv_fs_access, void *loop, void *req, const char *path) { + (void)ctx; + (void)loop; + (void)req; + + // the obi nodejs agent (fdextractor.js) passes signals to the ebpf layer + // by invoking uv_fs_access() with a fake path. Two formats are used: + // + // 1. fd pair correlation (outgoing -> incoming): + // /dev/null/obi/ — each fd is a left-zero-padded 4-digit number + // + // 2. async context switch (before-hook fires before each JS callback): + // /dev/null/obi-ctx/ — 4-digit incoming fd for the current async context + // + // Both paths share the prefix "/dev/null/obi" (13 chars). The character at + // position 13 distinguishes the two formats: + // '/' -> format 1 (fd pair) + // '-' -> format 2 (context switch, "-ctx/" follows) + static const char prefix[] = "/dev/null/obi"; + static const u8 prefix_size = sizeof(prefix) - 1; + + // Buffer sized to hold the longest path + null terminator. + // Both formats are exactly 22 characters long. + char buf[] = "/dev/null/obi/00000000"; + + if (bpf_probe_read_user(buf, sizeof(buf), path) != 0) { + return 0; + } + + if (obi_bpf_memcmp(prefix, buf, prefix_size) != 0) { + return 0; + } + + const u64 pid_tgid = bpf_get_current_pid_tgid(); + + // Async context switch: /dev/null/obi-ctx/XXXX + // Fires from the async_hooks 'before' callback in fdextractor.js to refresh + // traces_ctx_v1 before each JS callback. + if (buf[k_delim_offset] == '-') { + return handle_async_switch(buf, pid_tgid); + } + // fd pair correlation: /dev/null/obi/ + return handle_fd_correlation(buf, pid_tgid); +} diff --git a/devdocs/context-propagation.md b/devdocs/context-propagation.md index 3c2677ca4..c8f402e34 100644 --- a/devdocs/context-propagation.md +++ b/devdocs/context-propagation.md @@ -307,4 +307,4 @@ OBI allows injecting trace context into JSON logs. The following requirements mu - `CAP_SYS_ADMIN` capability and permission to use `bpf_probe_write_user` (kernel security lockdown mode should be `[none]`) - The target application writes logs in **JSON format** - BPFFS mounted at /sys/fs/bpf (or another mountpath configurable via `config.ebpf.bpf_fs_path`) -- Async primitives: only Go runtime is currently supported +- Async primitives: only Go and NodeJS runtimes are currently supported diff --git a/internal/test/integration/components/nodejsserver/app.js b/internal/test/integration/components/nodejsserver/app.js index 892de93ff..e23f18e35 100644 --- a/internal/test/integration/components/nodejsserver/app.js +++ b/internal/test/integration/components/nodejsserver/app.js @@ -137,6 +137,16 @@ app.get("/api/test-apm", async (req, res) => { } }); +app.get("/json_logger", (_req, res) => { + // Fixed async delay so all concurrent in-flight callbacks interleave inside + // the libuv event loop, exercising the traces_ctx_v1 context-switch fix. + setTimeout(() => { + const message = "this is a json log from node"; + process.stdout.write(JSON.stringify({ message, level: "INFO" }) + "\n"); + res.send(message); + }, 35); +}); + app.listen(port, () => { console.log("Server running on port " + port); }); diff --git a/internal/test/integration/configs/obi-config-log-enricher.yml b/internal/test/integration/configs/obi-config-log-enricher.yml index 31dd2c4a7..7c3a9889a 100644 --- a/internal/test/integration/configs/obi-config-log-enricher.yml +++ b/internal/test/integration/configs/obi-config-log-enricher.yml @@ -16,7 +16,7 @@ ebpf: log_enricher: services: - service: - - open_ports: "8380,50051" + - open_ports: "8380,50051,3030" discovery: min_process_age: "0s" log_config: "yaml" diff --git a/internal/test/integration/docker-compose-log-enricher.yml b/internal/test/integration/docker-compose-log-enricher.yml index 0585da606..4f4166d8b 100644 --- a/internal/test/integration/docker-compose-log-enricher.yml +++ b/internal/test/integration/docker-compose-log-enricher.yml @@ -21,6 +21,16 @@ services: ports: - "8381:8380" + testservernodejs: + build: + context: ../../.. + dockerfile: internal/test/integration/components/nodejsserver/Dockerfile + image: hatest-testserver-node + networks: + - shared + ports: + - "8383:3030" + obi: build: context: ../../.. @@ -51,6 +61,7 @@ services: OTEL_EBPF_HOSTNAME: "beyla" OTEL_EBPF_OTLP_TRACES_BATCH_TIMEOUT: "1ms" OTEL_EBPF_BPF_HTTP_REQUEST_TIMEOUT: "5s" + OTEL_EBPF_BPF_TRACK_REQUEST_HEADERS: "true" OTEL_EBPF_PROCESSES_INTERVAL: "100ms" OTEL_EBPF_METRICS_FEATURES: "application,application_process,application_span,application_service_graph" OTEL_EBPF_PROMETHEUS_FEATURES: "application,application_span,application_process,application_service_graph" @@ -59,6 +70,8 @@ services: condition: service_started testservergrpcgo: condition: service_started + testservernodejs: + condition: service_started networks: shared: diff --git a/internal/test/integration/log_enricher_test.go b/internal/test/integration/log_enricher_test.go index d274ffcc4..8b8dbc806 100644 --- a/internal/test/integration/log_enricher_test.go +++ b/internal/test/integration/log_enricher_test.go @@ -7,7 +7,10 @@ import ( "bufio" "context" "encoding/json" + "fmt" + "net/http" "strings" + "sync" "testing" "time" @@ -43,8 +46,26 @@ var ( containerImage: "hatest-testserver-logenricher-grpc-go", message: "hello!", } + logEnricherNodeJSConstants = testServerConstants{ + url: "http://localhost:8383", + smokeEndpoint: "/smoke", + logEndpoint: "/json_logger", + containerImage: "hatest-testserver-node", + message: "this is a json log from node", + } ) +// nodejsTestTraceparents are fixed W3C traceparents used by testLogEnricherNodeJS. +// Fixed IDs allow exact equality assertions on trace_id and ordering assertions +// on the enriched container logs. +var nodejsTestTraceparents = [5]struct{ traceID, parentID string }{ + {"4bf92f3577b34da6a3ce929d0e0e4736", "00f067aa0ba902b7"}, + {"7b5c1e7d8f2a4b6c9e0d3f1a2b4c5d6e", "1a2b3c4d5e6f7a8b"}, + {"a1b2c3d4e5f60718293a4b5c6d7e8f90", "fedcba9876543210"}, + {"0102030405060708090a0b0c0d0e0f10", "0102030405060708"}, + {"deadbeefcafebabe0123456789abcdef", "cafebabe01234567"}, +} + func containerLogs(t require.TestingT, cl *client.Client, containerID string) []string { reader, err := cl.ContainerLogs(context.TODO(), containerID, container.LogsOptions{ ShowStdout: true, @@ -82,6 +103,91 @@ func testContainerID(t require.TestingT, cl *client.Client, image string) string return "" } +// testLogEnricherNodeJS sends N concurrent requests, each carrying a distinct +// W3C traceparent, and verifies that every injected trace_id appears in an +// enriched container log line. The server introduces a random async delay so +// that multiple libuv I/O callbacks are in-flight simultaneously, exercising +// the traces_ctx_v1 context-switch fix in the async_hooks before hook. +func testLogEnricherNodeJS(t *testing.T) { + waitForTestComponentsNoMetrics(t, logEnricherNodeJSConstants.url+logEnricherNodeJSConstants.smokeEndpoint) + + cl, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + require.NoError(t, err) + defer cl.Close() + + require.EventuallyWithT(t, func(ct *assert.CollectT) { + // Fire one request per traceparent concurrently so all libuv callbacks + // are in-flight simultaneously. Goroutines are staggered by 5 ms so that + // requests arrive at the server in array order (server delay is 35 ms, + // much larger than the stagger), giving a deterministic log order. + var wg sync.WaitGroup + for i, tp := range nodejsTestTraceparents { + wg.Add(1) + go func(tp struct{ traceID, parentID string }) { + defer wg.Done() + req, err := http.NewRequest(http.MethodGet, + logEnricherNodeJSConstants.url+logEnricherNodeJSConstants.logEndpoint, nil) + if err != nil { + return + } + req.Header.Set("traceparent", fmt.Sprintf("00-%s-%s-01", tp.traceID, tp.parentID)) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return + } + resp.Body.Close() + }(tp) + // Small stagger between goroutine starts so HTTP requests reach the + // server in the same order they are launched. + if i < len(nodejsTestTraceparents)-1 { + time.Sleep(5 * time.Millisecond) + } + } + wg.Wait() + + containerID := testContainerID(ct, cl, logEnricherNodeJSConstants.containerImage) + require.NotEmpty(ct, containerID, "could not find test container ID") + logs := containerLogs(ct, cl, containerID) + require.NotEmpty(ct, logs) + + // Find the last log-position of each injected trace_id (most recent retry). + lastPos := make(map[string]int, len(nodejsTestTraceparents)) + lastSpanID := make(map[string]string, len(nodejsTestTraceparents)) + for i, line := range logs { + var fields map[string]string + if json.Unmarshal([]byte(line), &fields) != nil { + continue + } + if tid, ok := fields["trace_id"]; ok { + lastPos[tid] = i + lastSpanID[tid] = fields["span_id"] + } + } + + // Every injected trace_id must appear with a non-empty span_id. + for _, tp := range nodejsTestTraceparents { + _, found := lastPos[tp.traceID] + assert.True(ct, found, "no enriched log line found for trace_id %s", tp.traceID) + if found { + assert.NotEmpty(ct, lastSpanID[tp.traceID], "span_id missing for trace_id %s", tp.traceID) + } + } + + // Log lines must appear in the same order requests were made. + // Using last-occurrence positions compares within the most recent batch. + for i := 0; i < len(nodejsTestTraceparents)-1; i++ { + a, b := nodejsTestTraceparents[i], nodejsTestTraceparents[i+1] + posA, okA := lastPos[a.traceID] + posB, okB := lastPos[b.traceID] + if okA && okB { + assert.Less(ct, posA, posB, + "trace_id %s should appear before %s in logs (request order)", + a.traceID, b.traceID) + } + } + }, testTimeout, 500*time.Millisecond) +} + func testLogEnricher(t *testing.T, constants testServerConstants) { waitForTestComponentsNoMetrics(t, constants.url+constants.smokeEndpoint) diff --git a/internal/test/integration/suites_test.go b/internal/test/integration/suites_test.go index 5dd7bc422..b7db41eb3 100644 --- a/internal/test/integration/suites_test.go +++ b/internal/test/integration/suites_test.go @@ -717,6 +717,19 @@ func TestSuite_LogEnricherGoGRPC(t *testing.T) { require.NoError(t, compose.Close()) } +func TestSuite_LogEnricherNodeJS(t *testing.T) { + compose, err := docker.ComposeSuite("docker-compose-log-enricher.yml", path.Join(pathOutput, "test-suite-log-enricher-nodejs.log")) + require.NoError(t, err) + + compose.Env = append(compose.Env, `OTEL_EBPF_OPEN_PORT=3030`, `OTEL_EBPF_EXECUTABLE_PATH=`) + require.NoError(t, compose.Up()) + + t.Run("Log Enricher Node.js", func(t *testing.T) { + testLogEnricherNodeJS(t) + }) + require.NoError(t, compose.Close()) +} + // Helpers var lockdownPath = "/sys/kernel/security/lockdown" diff --git a/pkg/internal/nodejs/fdextractor.js b/pkg/internal/nodejs/fdextractor.js index 388a65b4a..4205bcb59 100644 --- a/pkg/internal/nodejs/fdextractor.js +++ b/pkg/internal/nodejs/fdextractor.js @@ -16,7 +16,7 @@ net.Server.prototype.emit = orig.serverEmit; net.Socket.prototype.connect = orig.socketConnect; net.Socket.prototype.write = orig.socketWrite; -const { AsyncLocalStorage } = require('async_hooks'); +const { AsyncLocalStorage, createHook } = require('async_hooks'); const debug_enabled = false; @@ -44,6 +44,8 @@ net.Server.prototype.emit = function (event, ...args) { return orig.serverEmit.call(this, event, ...args); }; +const pad4 = n => String(n).padStart(4, '0'); + function correlate(incomingFd, outFd, socket) { if (incomingFd < 0 || outFd < 0 || incomingFd === outFd) { return Promise.resolve(); @@ -58,8 +60,6 @@ function correlate(incomingFd, outFd, socket) { ); } - const pad4 = n => String(n).padStart(4, '0'); - try { fs.accessSync(`/dev/null/obi/${pad4(incomingFd)}${pad4(outFd)}`) } catch (err) { @@ -101,3 +101,18 @@ net.Socket.prototype.write = function (data, ...rest) { return doWrite(); }; + +// Signal the BPF layer before each async callback so it can restore the correct +// trace context for this request into traces_ctx_v1. +// fs.accessSync is safe inside async_hooks callbacks: synchronous fs operations +// do not create AsyncWrap objects and therefore do not re-trigger this hook. +createHook({ + before() { + const store = als.getStore(); + if (store && store.incomingFd != null && store.incomingFd >= 0) { + try { + fs.accessSync(`/dev/null/obi-ctx/${pad4(store.incomingFd)}`); + } catch (_) {} + } + }, +}).enable();