Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
8e872fb
WIP: Improvements to large buffer handling
grcevski Feb 20, 2026
4004bc7
implement support for compressed responses
grcevski Feb 20, 2026
e5c2621
add traces support for openAI
grcevski Feb 23, 2026
8aad6d4
add test code
grcevski Feb 23, 2026
e4269bf
add support for conversations, items, data
grcevski Feb 23, 2026
7cf980b
more fixes
grcevski Feb 23, 2026
b966704
make sensitive attributes optional, add tests
grcevski Feb 23, 2026
d56db26
more tests
grcevski Feb 23, 2026
8d60460
integration test
grcevski Feb 24, 2026
51985af
lints and fixes
grcevski Feb 24, 2026
14a3af9
Merge branch 'main' into improve_large_buffers
grcevski Feb 24, 2026
2bb9d35
fix bugs
grcevski Feb 24, 2026
80a40e2
update config schema
grcevski Feb 24, 2026
9c3cd13
update notices
grcevski Feb 24, 2026
18af86a
fix merge issue
grcevski Feb 24, 2026
5ef78b6
fix icoverage grep failure
grcevski Feb 24, 2026
eb58d99
better fix
grcevski Feb 24, 2026
1b19f9f
fix bug, unit test, code review comments
grcevski Feb 25, 2026
ef580fb
improve loop
grcevski Feb 25, 2026
dc3966c
Update pkg/ebpf/common/http/responses.go
grcevski Feb 25, 2026
7b44dbb
more review feedback
grcevski Feb 25, 2026
7056ae9
Merge branch 'improve_large_buffers' of github.com:grcevski/opentelem…
grcevski Feb 25, 2026
9a41f80
review feedback
grcevski Feb 25, 2026
8fd63df
fix
grcevski Feb 25, 2026
5d81bcf
remove binary file
grcevski Feb 25, 2026
bf6ec0e
update comment
grcevski Feb 25, 2026
9ebf022
update schema
grcevski Feb 25, 2026
f5a8208
fix parsing issue
grcevski Feb 25, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ itest-coverage-data:
# replace the unexpected /src/cmd/obi/main.go file by the module path
sed 's/^\/src\/cmd\//github.com\/open-telemetry\/opentelemetry-ebpf-instrumentation\/cmd\//' $(TEST_OUTPUT)/itest-covdata.raw.txt > $(TEST_OUTPUT)/itest-covdata.all.txt
# exclude generated files from coverage data
grep -vE $(EXCLUDE_COVERAGE_FILES) $(TEST_OUTPUT)/itest-covdata.all.txt > $(TEST_OUTPUT)/itest-covdata.txt
grep -vE $(EXCLUDE_COVERAGE_FILES) $(TEST_OUTPUT)/itest-covdata.all.txt > $(TEST_OUTPUT)/itest-covdata.txt || true

.PHONY: oats-prereq
oats-prereq: $(GINKGO) docker-generate
Expand Down Expand Up @@ -538,8 +538,13 @@ oats-test-mongo: oats-prereq
mkdir -p internal/test/oats/mongo/$(TEST_OUTPUT)/run
cd internal/test/oats/mongo && TESTCASE_TIMEOUT=5m TESTCASE_BASE_PATH=./yaml $(GINKGO) -v -r

.PHONY: oats-test-ai
oats-test-ai: oats-prereq
mkdir -p internal/test/oats/ai/$(TEST_OUTPUT)/run
cd internal/test/oats/ai && TESTCASE_TIMEOUT=5m TESTCASE_BASE_PATH=./yaml $(GINKGO) -v -r

.PHONY: oats-test
oats-test: oats-test-sql oats-test-mongo oats-test-redis oats-test-kafka oats-test-http
oats-test: oats-test-sql oats-test-mongo oats-test-redis oats-test-kafka oats-test-http oats-test-ai
$(MAKE) itest-coverage-data

.PHONY: oats-test-debug
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
Copyright (c) 2009, 2010, 2013-2016 by the Brotli Authors.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
7 changes: 5 additions & 2 deletions bpf/common/large_buffers.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ enum {
// The actual size is "event size + payload". Since the payload
// is guaranteed to be a power of 2, we take the next power of 2
// of the maximum payload size as a guard.
k_large_buf_max_size = 1 << 14, // 16K
k_large_buf_max_size = 1 << 15, // 32K
k_large_buf_max_size_mask = k_large_buf_max_size - 1,

// Maximum size for a large buffer payload.
k_large_buf_payload_max_size = 1 << 13, // 8K
k_large_buf_payload_max_size = 1 << 14, // 16K
k_large_buf_payload_max_size_mask = k_large_buf_payload_max_size - 1,

// Absolute maximum of bytes that we'll send, smaller chunks are sent one after another
k_large_buffer_read_limit = 1 << 16, // 64K
};

SCRATCH_MEM_SIZED(http_large_buffers, k_large_buf_max_size);
Expand Down
24 changes: 23 additions & 1 deletion bpf/generictracer/k_send_receive.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,46 @@
#include <bpfcore/vmlinux.h>
#include <bpfcore/bpf_helpers.h>

#include <common/connection_info.h>
#include <common/protocol_defs.h>
#include <common/ringbuf.h>

#include <generictracer/k_tracer_defs.h>

#include <generictracer/maps/active_recv_args.h>
#include <generictracer/maps/active_send_args.h>
#include <generictracer/maps/active_send_sock_args.h>

static __always_inline void ensure_sent_event(u64 id, u64 *sock_p) {
static __always_inline u8 same_direction(pid_connection_info_t *p_conn, u8 direction) {
http_info_t *info = bpf_map_lookup_elem(&ongoing_http, p_conn);
if (info && !info->submitted) {
return ((info->type == EVENT_HTTP_REQUEST) && (direction == TCP_SEND)) ||
((info->type == EVENT_HTTP_CLIENT) && (direction == TCP_RECV));
}
return false;
}

static __always_inline void ensure_sent_event(u64 id, u64 *sock_p, u8 direction) {
if (high_request_volume) {
return;
}

send_args_t *s_args = (send_args_t *)bpf_map_lookup_elem(&active_send_args, &id);
if (s_args) {
bpf_dbg_printk("Checking if we need to finish the request per thread id");

if (same_direction(&s_args->p_conn, direction)) {
return;
}

finish_possible_delayed_http_request(&s_args->p_conn);
} // see if we match on another thread, but same sock *
s_args = (send_args_t *)bpf_map_lookup_elem(&active_send_sock_args, sock_p);
if (s_args) {
bpf_dbg_printk("Checking if we need to finish the request per socket");
if (same_direction(&s_args->p_conn, direction)) {
return;
}
finish_possible_delayed_http_request(&s_args->p_conn);
}
}
Expand Down
45 changes: 31 additions & 14 deletions bpf/generictracer/k_tracer.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <common/iov_iter.h>
#include <common/msg_buffer.h>
#include <common/dns.h>
#include <common/protocol_defs.h>
#include <common/sock_port_ns.h>
#include <common/sockaddr.h>
#include <common/ssl_helpers.h>
Expand All @@ -24,6 +25,7 @@
#include <generictracer/maps/active_connect_args.h>
#include <generictracer/maps/listening_ports.h>
#include <generictracer/maps/tcp_connection_map.h>
#include <generictracer/protocol_common.h>
#include <generictracer/protocol_http.h>
#include <generictracer/protocol_http2.h>
#include <generictracer/protocol_mysql.h>
Expand Down Expand Up @@ -698,7 +700,7 @@ static __always_inline void setup_recvmsg(u64 id, struct sock *sk, struct msghdr
// sent through the same socket. This mainly happens if the server overlays virtual
// threads in the runtime.
u64 sock_p = (u64)sk;
ensure_sent_event(id, &sock_p);
ensure_sent_event(id, &sock_p, TCP_RECV);
connect_ssl_to_sock(id, sk, TCP_RECV);

recv_args_t args = {
Expand Down Expand Up @@ -1162,10 +1164,17 @@ int obi_handle_buf_with_args(void *ctx) {
} else { // large request tracking and generic TCP
http_info_t *info = bpf_map_lookup_elem(&ongoing_http, &args->pid_conn);

bpf_d_printk("http info %llx, submitted %d, still reading %d",
info,
(info) ? info->submitted : 0,
(info) ? still_reading(info) : 0);

if (info && !info->submitted) {
u8 reading = still_reading(info);
u8 responding = still_responding(info);
// Still reading checks if we are processing buffers of a HTTP request
// that has started, but we haven't seen a response yet.
if (still_reading(info)) {
if (reading || responding) {
// Packets are split into chunks if OBI injected the Traceparent
// Make sure you look for split packets containing the real Traceparent.
// Essentially, when a packet is extended by our sock_msg program and
Expand All @@ -1175,7 +1184,7 @@ int obi_handle_buf_with_args(void *ctx) {
// scan for the incoming 'Traceparent' header. If they are not reassembled
// we'll see something like this:
// [before the injected header],[70 bytes for 'Traceparent...'],[the rest].
if (is_traceparent(args->small_buf)) {
if (reading && is_traceparent(args->small_buf)) {
unsigned char *buf = tp_char_buf();
if (buf) {
bpf_probe_read(buf, TP_SIZE, (unsigned char *)args->u_buf);
Expand All @@ -1202,17 +1211,25 @@ int obi_handle_buf_with_args(void *ctx) {
}
}

http_send_large_buffer(
info,
(void *)args->u_buf,
args->bytes_len,
// Packet type can't be reliably determined in HTTP split packets. This should
// always be a request.
PACKET_TYPE_REQUEST,
args->direction,
k_large_buf_action_append);
} else if (still_responding(info)) {
info->end_monotime_ns = bpf_ktime_get_ns();
u8 packet_type = PACKET_TYPE_REQUEST;
if (responding) {
packet_type = PACKET_TYPE_RESPONSE;
}

http_send_large_buffer(info,
(void *)args->u_buf,
args->bytes_len,
packet_type,
args->direction,
k_large_buf_action_append);

if (reading) {
info->len += args->bytes_len;
} else if (responding) {
info->end_monotime_ns = bpf_ktime_get_ns();
bpf_d_printk("bytes len %d, new bytes %d", info->resp_len, args->bytes_len);
info->resp_len += args->bytes_len;
}
}
} else if (!info) {
// SSL requests will see both TCP traffic and text traffic, ignore the TCP if
Expand Down
2 changes: 1 addition & 1 deletion bpf/generictracer/k_unix_sock.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ int BPF_KPROBE(obi_kprobe_unix_stream_recvmsg,
// sent through the same socket. This mainly happens if the server overlays virtual
// threads in the runtime.
u64 sock_p = (u64)sk;
ensure_sent_event(id, &sock_p);
ensure_sent_event(id, &sock_p, TCP_RECV);

recv_args_t args = {
.sock_ptr = (u64)sk,
Expand Down
69 changes: 46 additions & 23 deletions bpf/generictracer/protocol_http.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <bpfcore/vmlinux.h>
#include <bpfcore/bpf_builtins.h>
#include <bpfcore/bpf_helpers.h>
#include <bpfcore/utils.h>

#include <common/common.h>
#include <common/http_types.h>
Expand Down Expand Up @@ -441,22 +442,15 @@ static __always_inline void process_http_response(http_info_t *info, const unsig
static __always_inline void handle_http_response(unsigned char *small_buf,
pid_connection_info_t *pid_conn,
http_info_t *info,
int orig_len,
u8 direction,
u8 ssl) {
int orig_len) {
process_http_response(info, small_buf);
cleanup_http_request_data(pid_conn, info);

if ((direction != TCP_SEND) ||
high_request_volume /*|| (ssl != NO_SSL) || (orig_len < KPROBES_LARGE_RESPONSE_LEN)*/) {
if (high_request_volume) {
finish_http(info, pid_conn);
} else {
if (ssl) {
finish_http(info, pid_conn);
} else {
bpf_dbg_printk("Delaying finish http for large request, orig_len=%d", orig_len);
info->delayed = 1;
}
bpf_dbg_printk("Delaying finish http for large request, orig_len=%d", orig_len);
info->delayed = 1;
}
}

Expand All @@ -483,22 +477,49 @@ static __always_inline int http_send_large_buffer(http_info_t *req,
large_buf->action = action;
large_buf->tp = req->tp;

large_buf->len = bytes_len;
if (large_buf->len >= http_buffer_size) {
large_buf->len = http_buffer_size;
bpf_dbg_printk("WARN: buffer is full, truncating data");
req->has_large_buffers = true;

u32 available_bytes = bytes_len;
// limit by the userspace requested size
if (available_bytes > http_buffer_size) {
available_bytes = http_buffer_size;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be misunderstanding so please bear with me.

http_buffer_size is always meant to be less than k_large_buf_payload_max_size (i.e. k_large_buf_payload_max_size is a ceiling).

So capping available_bytes to http_buffer_size means that you will always end up sending a single large buffer (niter == 1) and I am assuming the intent here is to slice available_bytes into N large buffers, so I think this block should be removed - then see below.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessarily, it's set by userspace and while there's a cap on the config setting, I don't want to leave it up to userspace to decide.

User space can set 2K, or 200K. If it's 2K we should only send 2K. If it's 200K, it should send 64K.

// limit by the maximum bytes we can ever export
bpf_clamp_umax(available_bytes, k_large_buffer_read_limit);

bpf_probe_read(large_buf->buf, large_buf->len & k_large_buf_payload_max_size_mask, u_buf);
bpf_dbg_printk("sending large buffer, total size=%d, packet_type=%d, direction %d",
bytes_len,
packet_type,
direction);

u32 total_size = sizeof(tcp_large_buffer_t);
total_size += large_buf->len > sizeof(void *) ? large_buf->len : sizeof(void *);
const uint32_t niter = (available_bytes / k_large_buf_payload_max_size) +
Comment thread
mmat11 marked this conversation as resolved.
((available_bytes % k_large_buf_payload_max_size) > 0);
Comment thread
rafaelroquetto marked this conversation as resolved.

req->has_large_buffers = true;
int b = 0;
for (; b < niter; b++) {
const u32 offset = b * k_large_buf_payload_max_size;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and then this becomes

Suggested change
const u32 offset = b * k_large_buf_payload_max_size;
const u32 offset = b * http_buffer_size;

otherwise your stride is potentially larger than http_buffer_size and you skip bytes. I think this only worked so far because we are consistently using k_large_buf_payload_max_size to read, meaning we are not respecting http_buffer_size and always sending the maximum number of bytes

if (offset >= k_large_buffer_read_limit) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (offset >= k_large_buffer_read_limit) {
if (offset + read_size >= k_large_buffer_read_limit) {

if we can read at most k_large_buffer_read_limit in total, we need to account for the bytes already read (i.e. offset bytes) + the bytes we are about to read, otherwise we can overflow.

break;
}
u32 read_size = available_bytes;
Comment thread
rafaelroquetto marked this conversation as resolved.
bpf_clamp_umax(read_size, k_large_buf_payload_max_size);
Comment thread
rafaelroquetto marked this conversation as resolved.
bpf_probe_read(large_buf->buf, read_size, (void *)(&u_buf[offset]));

// left here intentionally for debugging
// bpf_dbg_printk("sending large buffer, size=%d, action=%d", read_size, action);

large_buf->len = read_size;

bpf_dbg_printk("sending large buffer, size=%d", bytes_len);
u32 total_size = sizeof(tcp_large_buffer_t);
total_size += large_buf->len > sizeof(void *) ? large_buf->len : sizeof(void *);

bpf_clamp_umax(total_size, k_large_buf_max_size);
bpf_ringbuf_output(&events, large_buf, total_size, get_flags());

available_bytes -= read_size;
large_buf->action = k_large_buf_action_append;
}

bpf_ringbuf_output(&events, large_buf, total_size & k_large_buf_max_size_mask, get_flags());
return 0;
}

Expand Down Expand Up @@ -659,9 +680,9 @@ __obi_protocol_http(struct pt_regs *ctx, unsigned char *(*tp_loop_fn)(unsigned c
args->packet_type,
args->direction,
k_large_buf_action_init);
handle_http_response(
args->small_buf, &args->pid_conn, info, args->bytes_len, args->direction, args->ssl);
handle_http_response(args->small_buf, &args->pid_conn, info, args->bytes_len);
} else if (still_reading(info)) {
// print here
http_send_large_buffer(info,
(void *)args->u_buf,
args->bytes_len,
Expand All @@ -670,7 +691,9 @@ __obi_protocol_http(struct pt_regs *ctx, unsigned char *(*tp_loop_fn)(unsigned c
k_large_buf_action_append);

info->len += args->bytes_len;
} else if (still_responding(info)) {
info->end_monotime_ns = bpf_ktime_get_ns();
info->resp_len += args->bytes_len;
}

return 0;
Expand Down
2 changes: 1 addition & 1 deletion bpf/generictracer/ssl_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ static __always_inline void cleanup_complete_ssl_server_trace(http_info_t *info,
static __always_inline void
finish_possible_delayed_tls_http_request(pid_connection_info_t *pid_conn, void *ssl) {
http_info_t *info = bpf_map_lookup_elem(&ongoing_http, pid_conn);
if (info && (info->delayed || info->submitted)) {
if (info && info->submitted) {
// we need to check for server request, the same thread
// could be handling both client and server requests
if (info->type == EVENT_HTTP_REQUEST) {
Expand Down
16 changes: 15 additions & 1 deletion docs/config-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@
}
},
"type": "object",
"description": "Per-protocol data buffer size in bytes. Max: 8192 bytes. Default: 0 (disabled)."
"description": "Per-protocol data buffer size in bytes. Max: 64K bytes for HTTP. 8K bytes for other protocols. Default: 0 (disabled)."
},
"EBPFTracer": {
"properties": {
Expand Down Expand Up @@ -681,6 +681,10 @@
"$ref": "#/$defs/GraphQLConfig",
"description": "GraphQL payload extraction and parsing"
},
"openai": {
"$ref": "#/$defs/OpenAIConfig",
"description": "OpenAI payload extraction"
},
"sqlpp": {
"$ref": "#/$defs/SQLPPConfig",
"description": "SQL++ payload extraction and parsing (Couchbase and other SQL++ databases)"
Expand Down Expand Up @@ -1327,6 +1331,16 @@
},
"type": "object"
},
"OpenAIConfig": {
"properties": {
"enabled": {
"type": "boolean",
"description": "Enable OpenAI payload extraction and parsing",
"x-env-var": "OTEL_EBPF_HTTP_OPENAI_ENABLED"
}
},
"type": "object"
},
"PayloadExtraction": {
"properties": {
"http": {
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.25.7

require (
github.com/AlessandroPomponio/go-gibberish v0.0.0-20191004143433-a2d4156f0396
github.com/andybalholm/brotli v1.2.0
github.com/caarlos0/env/v9 v9.0.0
github.com/cilium/ebpf v0.20.0
github.com/containers/common v0.64.2
Expand All @@ -24,6 +25,7 @@ require (
github.com/hashicorp/go-version v1.8.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/invopop/jsonschema v0.13.0
github.com/klauspost/compress v1.18.2
github.com/ory/dockertest/v3 v3.12.0
github.com/oschwald/maxminddb-golang v1.13.1
github.com/prometheus/client_golang v1.23.2
Expand Down Expand Up @@ -164,7 +166,6 @@ require (
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.18.2 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/knadh/koanf/maps v0.1.2 // indirect
github.com/knadh/koanf/providers/confmap v1.0.0 // indirect
Expand Down
Loading