Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 77 additions & 18 deletions bpf/generictracer/nodejs.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,50 @@
#include <bpfcore/bpf_tracing.h>

#include <common/strings.h>
#include <common/tracing.h>

#include <logger/bpf_dbg.h>

#include <maps/fd_to_connection.h>
#include <maps/nodejs_fd_map.h>

SEC("uprobe/node:uv_fs_access")
int BPF_KPROBE(obi_uv_fs_access, void *loop, void *req, const char *path) {
(void)ctx;
(void)loop;
(void)req;
#include <shared/obi_ctx.h>

// the obi nodejs agent (fdextractor.js) passes the file descriptor pair
// to the ebpf layer by invoking uv_fs_access() with an invalid path:
// /dev/null/obi/<fd1><fd2> where each fd is a left-zero-padded 4 digit
// number
static const char prefix[] = "/dev/null/obi";
static const u8 prefix_size = sizeof(prefix) - 1;
enum {
k_delim_offset = 13,
k_fd1_offset = 14,
k_fd2_offset = 18,
k_ctx_fd_offset = 18,
k_max_fd_digits = 4
};

char buf[] = "/dev/null/obi/00000000";
static __always_inline int handle_async_switch(char *buf, const u64 pid_tgid) {
u32 fd = 0;
for (u8 i = 0; i < k_max_fd_digits; ++i) {
fd *= 10;
fd += buf[k_ctx_fd_offset + i] - '0';
}

enum { k_fd1_offset = 14, k_fd2_offset = 18, k_max_fd_digits = 4 };
bpf_dbg_printk("nodejs_async_switch: %s, pid_tgid = %llx, fd = %u", buf, pid_tgid, fd);

if (bpf_probe_read_user(buf, sizeof(buf), path) != 0) {
const fd_key fkey = {.pid_tgid = pid_tgid, .fd = (s32)fd};
const connection_info_t *conn = bpf_map_lookup_elem(&fd_to_connection, &fkey);
if (!conn) {
obi_ctx__del(pid_tgid);
return 0;
}

if (obi_bpf_memcmp(prefix, buf, prefix_size) != 0) {
return 0;
const tp_info_pid_t *tp = trace_info_for_connection(conn, TRACE_TYPE_SERVER);
if (tp && tp->valid) {
obi_ctx__set(pid_tgid, &tp->tp);
} else {
obi_ctx__del(pid_tgid);
}

return 0;
}

static __always_inline int handle_fd_correlation(char *buf, const u64 pid_tgid) {
u32 fd1 = 0;
u32 fd2 = 0;

Expand All @@ -48,12 +62,57 @@ int BPF_KPROBE(obi_uv_fs_access, void *loop, void *req, const char *path) {
fd2 += buf[k_fd2_offset + i] - '0';
}

bpf_dbg_printk("nodejs_correlation: %s, fd1 = %u, fd2 = %u", buf, fd1, fd2);
bpf_dbg_printk("nodejs_fd_correlation: %s, fd1 = %u, fd2 = %u", buf, fd1, fd2);

const u64 pid_tgid = bpf_get_current_pid_tgid();
const u64 key = (pid_tgid << 32) | fd2;

bpf_map_update_elem(&nodejs_fd_map, &key, &fd1, BPF_ANY);

return 0;
}

SEC("uprobe/node:uv_fs_access")
int BPF_KPROBE(obi_uv_fs_access, void *loop, void *req, const char *path) {
(void)ctx;
(void)loop;
(void)req;

// the obi nodejs agent (fdextractor.js) passes signals to the ebpf layer
// by invoking uv_fs_access() with a fake path. Two formats are used:
//
// 1. fd pair correlation (outgoing -> incoming):
// /dev/null/obi/<fd1><fd2> — each fd is a left-zero-padded 4-digit number
//
// 2. async context switch (before-hook fires before each JS callback):
// /dev/null/obi-ctx/<fd> — 4-digit incoming fd for the current async context
//
// Both paths share the prefix "/dev/null/obi" (13 chars). The character at
Comment thread
mmat11 marked this conversation as resolved.
// position 13 distinguishes the two formats:
// '/' -> format 1 (fd pair)
// '-' -> format 2 (context switch, "-ctx/" follows)
static const char prefix[] = "/dev/null/obi";
static const u8 prefix_size = sizeof(prefix) - 1;

// Buffer sized to hold the longest path + null terminator.
// Both formats are exactly 22 characters long.
char buf[] = "/dev/null/obi/00000000";

if (bpf_probe_read_user(buf, sizeof(buf), path) != 0) {
return 0;
}

if (obi_bpf_memcmp(prefix, buf, prefix_size) != 0) {
return 0;
}

const u64 pid_tgid = bpf_get_current_pid_tgid();

// Async context switch: /dev/null/obi-ctx/XXXX
// Fires from the async_hooks 'before' callback in fdextractor.js to refresh
// traces_ctx_v1 before each JS callback.
if (buf[k_delim_offset] == '-') {
return handle_async_switch(buf, pid_tgid);
}
// fd pair correlation: /dev/null/obi/<fd1><fd2>
return handle_fd_correlation(buf, pid_tgid);
}
2 changes: 1 addition & 1 deletion devdocs/context-propagation.md
Original file line number Diff line number Diff line change
Expand Up @@ -307,4 +307,4 @@ OBI allows injecting trace context into JSON logs. The following requirements mu
- `CAP_SYS_ADMIN` capability and permission to use `bpf_probe_write_user` (kernel security lockdown mode should be `[none]`)
- The target application writes logs in **JSON format**
- BPFFS mounted at /sys/fs/bpf (or another mountpath configurable via `config.ebpf.bpf_fs_path`)
- Async primitives: only Go runtime is currently supported
- Async primitives: only Go and NodeJS runtimes are currently supported
10 changes: 10 additions & 0 deletions internal/test/integration/components/nodejsserver/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,16 @@ app.get("/api/test-apm", async (req, res) => {
}
});

app.get("/json_logger", (_req, res) => {
// Fixed async delay so all concurrent in-flight callbacks interleave inside
// the libuv event loop, exercising the traces_ctx_v1 context-switch fix.
setTimeout(() => {
const message = "this is a json log from node";
process.stdout.write(JSON.stringify({ message, level: "INFO" }) + "\n");
res.send(message);
}, 35);
});

app.listen(port, () => {
console.log("Server running on port " + port);
});
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ ebpf:
log_enricher:
services:
- service:
- open_ports: "8380,50051"
- open_ports: "8380,50051,3030"
discovery:
min_process_age: "0s"
log_config: "yaml"
13 changes: 13 additions & 0 deletions internal/test/integration/docker-compose-log-enricher.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ services:
ports:
- "8381:8380"

testservernodejs:
build:
context: ../../..
dockerfile: internal/test/integration/components/nodejsserver/Dockerfile
image: hatest-testserver-node
networks:
- shared
ports:
- "8383:3030"

obi:
build:
context: ../../..
Expand Down Expand Up @@ -51,6 +61,7 @@ services:
OTEL_EBPF_HOSTNAME: "beyla"
OTEL_EBPF_OTLP_TRACES_BATCH_TIMEOUT: "1ms"
OTEL_EBPF_BPF_HTTP_REQUEST_TIMEOUT: "5s"
OTEL_EBPF_BPF_TRACK_REQUEST_HEADERS: "true"
OTEL_EBPF_PROCESSES_INTERVAL: "100ms"
OTEL_EBPF_METRICS_FEATURES: "application,application_process,application_span,application_service_graph"
OTEL_EBPF_PROMETHEUS_FEATURES: "application,application_span,application_process,application_service_graph"
Expand All @@ -59,6 +70,8 @@ services:
condition: service_started
testservergrpcgo:
condition: service_started
testservernodejs:
condition: service_started

networks:
shared:
106 changes: 106 additions & 0 deletions internal/test/integration/log_enricher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import (
"bufio"
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -43,8 +46,26 @@ var (
containerImage: "hatest-testserver-logenricher-grpc-go",
message: "hello!",
}
logEnricherNodeJSConstants = testServerConstants{
url: "http://localhost:8383",
smokeEndpoint: "/smoke",
logEndpoint: "/json_logger",
containerImage: "hatest-testserver-node",
message: "this is a json log from node",
}
)

// nodejsTestTraceparents are fixed W3C traceparents used by testLogEnricherNodeJS.
// Fixed IDs allow exact equality assertions on trace_id and ordering assertions
// on the enriched container logs.
var nodejsTestTraceparents = [5]struct{ traceID, parentID string }{
{"4bf92f3577b34da6a3ce929d0e0e4736", "00f067aa0ba902b7"},
{"7b5c1e7d8f2a4b6c9e0d3f1a2b4c5d6e", "1a2b3c4d5e6f7a8b"},
{"a1b2c3d4e5f60718293a4b5c6d7e8f90", "fedcba9876543210"},
{"0102030405060708090a0b0c0d0e0f10", "0102030405060708"},
{"deadbeefcafebabe0123456789abcdef", "cafebabe01234567"},
}

func containerLogs(t require.TestingT, cl *client.Client, containerID string) []string {
reader, err := cl.ContainerLogs(context.TODO(), containerID, container.LogsOptions{
ShowStdout: true,
Expand Down Expand Up @@ -82,6 +103,91 @@ func testContainerID(t require.TestingT, cl *client.Client, image string) string
return ""
}

// testLogEnricherNodeJS sends N concurrent requests, each carrying a distinct
// W3C traceparent, and verifies that every injected trace_id appears in an
// enriched container log line. The server introduces a random async delay so
// that multiple libuv I/O callbacks are in-flight simultaneously, exercising
// the traces_ctx_v1 context-switch fix in the async_hooks before hook.
func testLogEnricherNodeJS(t *testing.T) {
waitForTestComponentsNoMetrics(t, logEnricherNodeJSConstants.url+logEnricherNodeJSConstants.smokeEndpoint)

cl, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
require.NoError(t, err)
defer cl.Close()

require.EventuallyWithT(t, func(ct *assert.CollectT) {
// Fire one request per traceparent concurrently so all libuv callbacks
// are in-flight simultaneously. Goroutines are staggered by 5 ms so that
// requests arrive at the server in array order (server delay is 35 ms,
// much larger than the stagger), giving a deterministic log order.
var wg sync.WaitGroup
for i, tp := range nodejsTestTraceparents {
wg.Add(1)
go func(tp struct{ traceID, parentID string }) {
defer wg.Done()
req, err := http.NewRequest(http.MethodGet,
logEnricherNodeJSConstants.url+logEnricherNodeJSConstants.logEndpoint, nil)
if err != nil {
return
}
req.Header.Set("traceparent", fmt.Sprintf("00-%s-%s-01", tp.traceID, tp.parentID))
resp, err := http.DefaultClient.Do(req)
if err != nil {
return
}
resp.Body.Close()
}(tp)
// Small stagger between goroutine starts so HTTP requests reach the
// server in the same order they are launched.
if i < len(nodejsTestTraceparents)-1 {
time.Sleep(5 * time.Millisecond)
}
}
wg.Wait()

containerID := testContainerID(ct, cl, logEnricherNodeJSConstants.containerImage)
require.NotEmpty(ct, containerID, "could not find test container ID")
logs := containerLogs(ct, cl, containerID)
require.NotEmpty(ct, logs)

// Find the last log-position of each injected trace_id (most recent retry).
lastPos := make(map[string]int, len(nodejsTestTraceparents))
lastSpanID := make(map[string]string, len(nodejsTestTraceparents))
for i, line := range logs {
var fields map[string]string
if json.Unmarshal([]byte(line), &fields) != nil {
continue
}
if tid, ok := fields["trace_id"]; ok {
lastPos[tid] = i
lastSpanID[tid] = fields["span_id"]
}
}

// Every injected trace_id must appear with a non-empty span_id.
for _, tp := range nodejsTestTraceparents {
_, found := lastPos[tp.traceID]
assert.True(ct, found, "no enriched log line found for trace_id %s", tp.traceID)
if found {
assert.NotEmpty(ct, lastSpanID[tp.traceID], "span_id missing for trace_id %s", tp.traceID)
}
}

// Log lines must appear in the same order requests were made.
// Using last-occurrence positions compares within the most recent batch.
for i := 0; i < len(nodejsTestTraceparents)-1; i++ {
a, b := nodejsTestTraceparents[i], nodejsTestTraceparents[i+1]
posA, okA := lastPos[a.traceID]
posB, okB := lastPos[b.traceID]
if okA && okB {
assert.Less(ct, posA, posB,
"trace_id %s should appear before %s in logs (request order)",
a.traceID, b.traceID)
}
}
}, testTimeout, 500*time.Millisecond)
}

func testLogEnricher(t *testing.T, constants testServerConstants) {
waitForTestComponentsNoMetrics(t, constants.url+constants.smokeEndpoint)

Expand Down
13 changes: 13 additions & 0 deletions internal/test/integration/suites_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,19 @@ func TestSuite_LogEnricherGoGRPC(t *testing.T) {
require.NoError(t, compose.Close())
}

func TestSuite_LogEnricherNodeJS(t *testing.T) {
compose, err := docker.ComposeSuite("docker-compose-log-enricher.yml", path.Join(pathOutput, "test-suite-log-enricher-nodejs.log"))
require.NoError(t, err)

compose.Env = append(compose.Env, `OTEL_EBPF_OPEN_PORT=3030`, `OTEL_EBPF_EXECUTABLE_PATH=`)
require.NoError(t, compose.Up())

t.Run("Log Enricher Node.js", func(t *testing.T) {
testLogEnricherNodeJS(t)
})
require.NoError(t, compose.Close())
}

// Helpers

var lockdownPath = "/sys/kernel/security/lockdown"
Expand Down
21 changes: 18 additions & 3 deletions pkg/internal/nodejs/fdextractor.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ net.Server.prototype.emit = orig.serverEmit;
net.Socket.prototype.connect = orig.socketConnect;
net.Socket.prototype.write = orig.socketWrite;

const { AsyncLocalStorage } = require('async_hooks');
const { AsyncLocalStorage, createHook } = require('async_hooks');

const debug_enabled = false;

Expand Down Expand Up @@ -44,6 +44,8 @@ net.Server.prototype.emit = function (event, ...args) {
return orig.serverEmit.call(this, event, ...args);
};

const pad4 = n => String(n).padStart(4, '0');

function correlate(incomingFd, outFd, socket) {
if (incomingFd < 0 || outFd < 0 || incomingFd === outFd) {
return Promise.resolve();
Expand All @@ -58,8 +60,6 @@ function correlate(incomingFd, outFd, socket) {
);
}

const pad4 = n => String(n).padStart(4, '0');

try {
fs.accessSync(`/dev/null/obi/${pad4(incomingFd)}${pad4(outFd)}`)
} catch (err) {
Expand Down Expand Up @@ -101,3 +101,18 @@ net.Socket.prototype.write = function (data, ...rest) {

return doWrite();
};

// Signal the BPF layer before each async callback so it can restore the correct
// trace context for this request into traces_ctx_v1.
// fs.accessSync is safe inside async_hooks callbacks: synchronous fs operations
// do not create AsyncWrap objects and therefore do not re-trigger this hook.
createHook({
before() {
const store = als.getStore();
if (store && store.incomingFd != null && store.incomingFd >= 0) {
try {
fs.accessSync(`/dev/null/obi-ctx/${pad4(store.incomingFd)}`);
} catch (_) {}
}
},
}).enable();