diff --git a/pkg/ebpf/common/common.go b/pkg/ebpf/common/common.go index ed5753bafa..a0f49eaadf 100644 --- a/pkg/ebpf/common/common.go +++ b/pkg/ebpf/common/common.go @@ -29,6 +29,7 @@ import ( "go.opentelemetry.io/obi/pkg/ebpf/common/dnsparser" "go.opentelemetry.io/obi/pkg/internal/ebpf/kafkaparser" "go.opentelemetry.io/obi/pkg/internal/ebpf/ringbuf" + "go.opentelemetry.io/obi/pkg/internal/largebuf" "go.opentelemetry.io/obi/pkg/pipe/msg" ) @@ -174,7 +175,7 @@ type EBPFParseContext struct { h2c *lru.Cache[uint64, h2Connection] redisDBCache *simplelru.LRU[BpfConnectionInfoT, int] couchbaseBucketCache *simplelru.LRU[BpfConnectionInfoT, CouchbaseBucketInfo] - largeBuffers *expirable.LRU[largeBufferKey, *LargeBuffer] + largeBuffers *expirable.LRU[largeBufferKey, *largebuf.LargeBuffer] mongoRequestCache PendingMongoDBRequests mysqlPreparedStatements *simplelru.LRU[mysqlPreparedStatementsKey, string] postgresPreparedStatements *simplelru.LRU[postgresPreparedStatementsKey, string] @@ -213,7 +214,7 @@ func NewEBPFParseContext(cfg *config.EBPFTracer, spansChan *msg.Queue[[]request. ) h2c, _ := lru.New[uint64, h2Connection](1024 * 10) - largeBuffers := expirable.NewLRU[largeBufferKey, *LargeBuffer](1024, nil, 5*time.Minute) + largeBuffers := expirable.NewLRU[largeBufferKey, *largebuf.LargeBuffer](1024, nil, 5*time.Minute) if cfg != nil { protocolDebug = cfg.ProtocolDebug diff --git a/pkg/ebpf/common/couchbase_detect_transform.go b/pkg/ebpf/common/couchbase_detect_transform.go index 9ba574c29d..2c87eaa14c 100644 --- a/pkg/ebpf/common/couchbase_detect_transform.go +++ b/pkg/ebpf/common/couchbase_detect_transform.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/obi/pkg/appolly/app" "go.opentelemetry.io/obi/pkg/appolly/app/request" "go.opentelemetry.io/obi/pkg/internal/ebpf/couchbasekv" + "go.opentelemetry.io/obi/pkg/internal/largebuf" ) // CouchbaseInfo holds parsed Couchbase memcached binary protocol information. @@ -31,7 +32,7 @@ type CouchbaseInfo struct { // ProcessPossibleCouchbaseEvent attempts to parse the event as a Couchbase memcached binary protocol event. // Returns a slice of CouchbaseInfo if successful, along with a boolean indicating if the event should be ignored, // and an error if parsing failed. Multiple packets may be present in a single TCP segment due to pipelining. -func ProcessPossibleCouchbaseEvent(event *TCPRequestInfo, requestBuf *LargeBuffer, responseBuf *LargeBuffer, bucketCache *simplelru.LRU[BpfConnectionInfoT, CouchbaseBucketInfo]) (*CouchbaseInfo, bool, error) { +func ProcessPossibleCouchbaseEvent(event *TCPRequestInfo, requestBuf *largebuf.LargeBuffer, responseBuf *largebuf.LargeBuffer, bucketCache *simplelru.LRU[BpfConnectionInfoT, CouchbaseBucketInfo]) (*CouchbaseInfo, bool, error) { reqRaw := requestBuf.UnsafeView() respRaw := responseBuf.UnsafeView() info, ignore, err := processCouchbaseEvent(event.ConnInfo, reqRaw, respRaw, bucketCache) diff --git a/pkg/ebpf/common/couchbase_detect_transform_test.go b/pkg/ebpf/common/couchbase_detect_transform_test.go index c0ffccfa47..0cfbcc5241 100644 --- a/pkg/ebpf/common/couchbase_detect_transform_test.go +++ b/pkg/ebpf/common/couchbase_detect_transform_test.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/obi/pkg/appolly/app/request" "go.opentelemetry.io/obi/pkg/internal/ebpf/couchbasekv" + "go.opentelemetry.io/obi/pkg/internal/largebuf" ) // Helper functions to create Couchbase packets for testing @@ -551,7 +552,7 @@ func TestProcessPossibleCouchbaseEventReversedBuffers(t *testing.T) { } // Test with buffers in correct order - info, ignore, err := ProcessPossibleCouchbaseEvent(event, NewLargeBufferFrom(requestBuf), NewLargeBufferFrom(responseBuf), cache) + info, ignore, err := ProcessPossibleCouchbaseEvent(event, largebuf.NewLargeBufferFrom(requestBuf), largebuf.NewLargeBufferFrom(responseBuf), cache) require.NoError(t, err) assert.False(t, ignore) assert.Equal(t, "GET", info.Operation) @@ -563,7 +564,7 @@ func TestProcessPossibleCouchbaseEventReversedBuffers(t *testing.T) { ConnInfo: connInfo, Direction: 1, } - info, ignore, err = ProcessPossibleCouchbaseEvent(event2, NewLargeBufferFrom(garbageBuf), NewLargeBufferFrom(requestBuf), cache) + info, ignore, err = ProcessPossibleCouchbaseEvent(event2, largebuf.NewLargeBufferFrom(garbageBuf), largebuf.NewLargeBufferFrom(requestBuf), cache) require.NoError(t, err) assert.False(t, ignore) require.NotNil(t, info) @@ -594,7 +595,7 @@ func TestProcessPossibleCouchbaseEventConnectionIsolation(t *testing.T) { selectBucket1Resp := makeCouchbaseResponsePacket(couchbasekv.OpcodeSelectBucket, couchbasekv.StatusSuccess, "") event1 := &TCPRequestInfo{ConnInfo: connInfo1, Direction: 1} - _, ignore, err := ProcessPossibleCouchbaseEvent(event1, NewLargeBufferFrom(selectBucket1Req), NewLargeBufferFrom(selectBucket1Resp), cache) + _, ignore, err := ProcessPossibleCouchbaseEvent(event1, largebuf.NewLargeBufferFrom(selectBucket1Req), largebuf.NewLargeBufferFrom(selectBucket1Resp), cache) require.NoError(t, err) assert.True(t, ignore) // SELECT_BUCKET is ignored @@ -603,7 +604,7 @@ func TestProcessPossibleCouchbaseEventConnectionIsolation(t *testing.T) { selectBucket2Resp := makeCouchbaseResponsePacket(couchbasekv.OpcodeSelectBucket, couchbasekv.StatusSuccess, "") event2 := &TCPRequestInfo{ConnInfo: connInfo2, Direction: 1} - _, ignore, err = ProcessPossibleCouchbaseEvent(event2, NewLargeBufferFrom(selectBucket2Req), NewLargeBufferFrom(selectBucket2Resp), cache) + _, ignore, err = ProcessPossibleCouchbaseEvent(event2, largebuf.NewLargeBufferFrom(selectBucket2Req), largebuf.NewLargeBufferFrom(selectBucket2Resp), cache) require.NoError(t, err) assert.True(t, ignore) @@ -612,7 +613,7 @@ func TestProcessPossibleCouchbaseEventConnectionIsolation(t *testing.T) { getCollID1Resp := makeCouchbaseResponsePacket(couchbasekv.OpcodeCollectionsGetID, couchbasekv.StatusSuccess, "") event1 = &TCPRequestInfo{ConnInfo: connInfo1, Direction: 1} - _, ignore, err = ProcessPossibleCouchbaseEvent(event1, NewLargeBufferFrom(getCollID1Req), NewLargeBufferFrom(getCollID1Resp), cache) + _, ignore, err = ProcessPossibleCouchbaseEvent(event1, largebuf.NewLargeBufferFrom(getCollID1Req), largebuf.NewLargeBufferFrom(getCollID1Resp), cache) require.NoError(t, err) assert.True(t, ignore) @@ -621,7 +622,7 @@ func TestProcessPossibleCouchbaseEventConnectionIsolation(t *testing.T) { getCollID2Resp := makeCouchbaseResponsePacket(couchbasekv.OpcodeCollectionsGetID, couchbasekv.StatusSuccess, "") event2 = &TCPRequestInfo{ConnInfo: connInfo2, Direction: 1} - _, ignore, err = ProcessPossibleCouchbaseEvent(event2, NewLargeBufferFrom(getCollID2Req), NewLargeBufferFrom(getCollID2Resp), cache) + _, ignore, err = ProcessPossibleCouchbaseEvent(event2, largebuf.NewLargeBufferFrom(getCollID2Req), largebuf.NewLargeBufferFrom(getCollID2Resp), cache) require.NoError(t, err) assert.True(t, ignore) @@ -631,7 +632,7 @@ func TestProcessPossibleCouchbaseEventConnectionIsolation(t *testing.T) { // GET on connection 1 should have bucket1/scope1/coll1 event1 = &TCPRequestInfo{ConnInfo: connInfo1, Direction: 1} - info1, ignore, err := ProcessPossibleCouchbaseEvent(event1, NewLargeBufferFrom(getReq), NewLargeBufferFrom(getResp), cache) + info1, ignore, err := ProcessPossibleCouchbaseEvent(event1, largebuf.NewLargeBufferFrom(getReq), largebuf.NewLargeBufferFrom(getResp), cache) require.NoError(t, err) assert.False(t, ignore) require.NotNil(t, info1) @@ -641,7 +642,7 @@ func TestProcessPossibleCouchbaseEventConnectionIsolation(t *testing.T) { // GET on connection 2 should have bucket2/scope2/coll2 event2 = &TCPRequestInfo{ConnInfo: connInfo2, Direction: 1} - info2, ignore, err := ProcessPossibleCouchbaseEvent(event2, NewLargeBufferFrom(getReq), NewLargeBufferFrom(getResp), cache) + info2, ignore, err := ProcessPossibleCouchbaseEvent(event2, largebuf.NewLargeBufferFrom(getReq), largebuf.NewLargeBufferFrom(getResp), cache) require.NoError(t, err) assert.False(t, ignore) require.NotNil(t, info2) diff --git a/pkg/ebpf/common/fast_cgi_detect_transform.go b/pkg/ebpf/common/fast_cgi_detect_transform.go index 273fe0ccb4..53997bd39a 100644 --- a/pkg/ebpf/common/fast_cgi_detect_transform.go +++ b/pkg/ebpf/common/fast_cgi_detect_transform.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/obi/pkg/appolly/app" "go.opentelemetry.io/obi/pkg/appolly/app/request" + "go.opentelemetry.io/obi/pkg/internal/largebuf" ) const ( @@ -108,14 +109,14 @@ func parseCGITable(b []byte) map[string]string { return res } -func maybeFastCGI(b *LargeBuffer) bool { +func maybeFastCGI(b *largebuf.LargeBuffer) bool { if b.Len() <= fastCGIRequestHeaderLen { return false } return bytes.Contains(b.UnsafeView(), []byte(requestMethodKey)) } -func parseHeader(b *LargeBuffer) ([]byte, error) { +func parseHeader(b *largebuf.LargeBuffer) ([]byte, error) { r := b.NewReader() for { if r.Remaining() < fastCGIRequestHeaderLen { @@ -144,7 +145,7 @@ func parseHeader(b *LargeBuffer) ([]byte, error) { } } -func detectFastCGI(b, rb *LargeBuffer) (string, string, int) { +func detectFastCGI(b, rb *largebuf.LargeBuffer) (string, string, int) { raw, err := parseHeader(b) if err != nil { return "", "", -1 diff --git a/pkg/ebpf/common/fast_cgi_detect_transform_test.go b/pkg/ebpf/common/fast_cgi_detect_transform_test.go index f1857955f4..f0e08125fb 100644 --- a/pkg/ebpf/common/fast_cgi_detect_transform_test.go +++ b/pkg/ebpf/common/fast_cgi_detect_transform_test.go @@ -7,6 +7,8 @@ import ( "testing" "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/obi/pkg/internal/largebuf" ) func TestMaybeFastCGI(t *testing.T) { @@ -45,7 +47,7 @@ func TestMaybeFastCGI(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ilen := min(len(tt.input), tt.inputLen) - res := maybeFastCGI(NewLargeBufferFrom(tt.input[0:ilen])) + res := maybeFastCGI(largebuf.NewLargeBufferFrom(tt.input[0:ilen])) assert.Equal(t, tt.expected, res) }) } @@ -214,7 +216,7 @@ func TestDetectFastCGI(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ilen := min(len(tt.input), tt.inputLen) olen := min(len(tt.output), tt.outputLen) - method, path, status := detectFastCGI(NewLargeBufferFrom(tt.input[0:ilen]), NewLargeBufferFrom(tt.output[0:olen])) + method, path, status := detectFastCGI(largebuf.NewLargeBufferFrom(tt.input[0:ilen]), largebuf.NewLargeBufferFrom(tt.output[0:olen])) assert.Equal(t, tt.expectedMethod, method) assert.Equal(t, tt.expectedPath, path) assert.Equal(t, tt.expectedResult, status) diff --git a/pkg/ebpf/common/go_kafka_transform.go b/pkg/ebpf/common/go_kafka_transform.go index f71b84a906..0ae7843475 100644 --- a/pkg/ebpf/common/go_kafka_transform.go +++ b/pkg/ebpf/common/go_kafka_transform.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/obi/pkg/appolly/app" "go.opentelemetry.io/obi/pkg/appolly/app/request" "go.opentelemetry.io/obi/pkg/internal/ebpf/ringbuf" + "go.opentelemetry.io/obi/pkg/internal/largebuf" ) func ReadGoSaramaRequestIntoSpan(record *ringbuf.Record) (request.Span, bool, error) { @@ -19,7 +20,7 @@ func ReadGoSaramaRequestIntoSpan(record *ringbuf.Record) (request.Span, bool, er return request.Span{}, true, err } - info, ignore, err := ProcessKafkaRequest(NewLargeBufferFrom(event.Buf[:]).NewReader(), nil) + info, ignore, err := ProcessKafkaRequest(largebuf.NewLargeBufferFrom(event.Buf[:]), nil) if err == nil && !ignore { return GoKafkaSaramaToSpan(event, info), false, nil diff --git a/pkg/ebpf/common/http2grpc_transform.go b/pkg/ebpf/common/http2grpc_transform.go index 307e0fb18c..d7dcccdb78 100644 --- a/pkg/ebpf/common/http2grpc_transform.go +++ b/pkg/ebpf/common/http2grpc_transform.go @@ -22,6 +22,7 @@ import ( "go.opentelemetry.io/obi/pkg/appolly/app/request" "go.opentelemetry.io/obi/pkg/internal/ebpf/bhpack" "go.opentelemetry.io/obi/pkg/internal/ebpf/ringbuf" + "go.opentelemetry.io/obi/pkg/internal/largebuf" ) type BPFHTTP2Info BpfHttp2GrpcRequestT @@ -544,7 +545,7 @@ func isLikelyHTTP2(data []uint8, eventLen int) bool { return false } -func isHTTP2(data *LargeBuffer, eventLen int) bool { +func isHTTP2(data *largebuf.LargeBuffer, eventLen int) bool { // Parsing HTTP2 frames with the Go HTTP2/gRPC parser is very expensive. // Therefore, we replicate some of our HTTP2 frame reader from eBPF here to // check if this payload even remotely looks like HTTP2/gRPC, e.g. we must @@ -553,7 +554,8 @@ func isHTTP2(data *LargeBuffer, eventLen int) bool { return false } - framer := http2.NewFramer(io.Discard, data.NewReader()) + dataReader := data.NewReader() + framer := http2.NewFramer(io.Discard, &dataReader) for { f, err := framer.ReadFrame() diff --git a/pkg/ebpf/common/http2grpc_transform_test.go b/pkg/ebpf/common/http2grpc_transform_test.go index 9a3a43a19b..a24cb2485f 100644 --- a/pkg/ebpf/common/http2grpc_transform_test.go +++ b/pkg/ebpf/common/http2grpc_transform_test.go @@ -10,6 +10,7 @@ import ( "golang.org/x/net/http2" "go.opentelemetry.io/obi/pkg/internal/ebpf/bhpack" + "go.opentelemetry.io/obi/pkg/internal/largebuf" ) var isHTTP2TestCases = []struct { @@ -117,7 +118,7 @@ func TestHTTP2QuickDetection(t *testing.T) { t.Run(tt.name, func(t *testing.T) { res := isLikelyHTTP2(tt.input, tt.inputLen) assert.Equal(t, tt.expectedQuick, res) - res1 := isHTTP2(NewLargeBufferFrom(tt.input), tt.inputLen) + res1 := isHTTP2(largebuf.NewLargeBufferFrom(tt.input), tt.inputLen) assert.Equal(t, tt.expected, res1) }) } @@ -535,7 +536,7 @@ func TestHandleHeaderField(t *testing.T) { func BenchmarkIsHTTP2(b *testing.B) { for i := 0; i < b.N; i++ { for _, tt := range isHTTP2TestCases { - _ = isHTTP2(NewLargeBufferFrom(tt.input), tt.inputLen) + _ = isHTTP2(largebuf.NewLargeBufferFrom(tt.input), tt.inputLen) } } } diff --git a/pkg/ebpf/common/http_transform.go b/pkg/ebpf/common/http_transform.go index b8099cb468..3879688ed0 100644 --- a/pkg/ebpf/common/http_transform.go +++ b/pkg/ebpf/common/http_transform.go @@ -18,6 +18,7 @@ import ( "go.opentelemetry.io/obi/pkg/appolly/app/request" ebpfhttp "go.opentelemetry.io/obi/pkg/ebpf/common/http" "go.opentelemetry.io/obi/pkg/internal/ebpf/ringbuf" + "go.opentelemetry.io/obi/pkg/internal/largebuf" ) func removeQuery(url string) string { @@ -191,7 +192,7 @@ func ReadHTTPInfoIntoSpan(parseCtx *EBPFParseContext, record *ringbuf.Record, fi func HTTPInfoEventToSpan(parseCtx *EBPFParseContext, event *BPFHTTPInfo) (request.Span, bool, error) { var ( - requestBuffer, responseBuffer *LargeBuffer + requestBuffer, responseBuffer *largebuf.LargeBuffer hasResponse bool isClient = isClientEvent(event.Type) ) @@ -204,7 +205,7 @@ func HTTPInfoEventToSpan(parseCtx *EBPFParseContext, event *BPFHTTPInfo) (reques requestBuffer = b } else { slog.Debug("missing large buffer for HTTP request", "traceID", event.Tp.TraceId, "conn", event.ConnInfo, "packetType", packetTypeRequest) - requestBuffer = NewLargeBufferFrom(event.Buf[:]) + requestBuffer = largebuf.NewLargeBufferFrom(event.Buf[:]) } b, ok = extractTCPLargeBuffer(parseCtx, event.Tp.TraceId, packetTypeResponse, directionByPacketType(packetTypeResponse, isClient), event.ConnInfo) @@ -215,7 +216,7 @@ func HTTPInfoEventToSpan(parseCtx *EBPFParseContext, event *BPFHTTPInfo) (reques slog.Debug("missing large buffer for HTTP response", "traceID", event.Tp.TraceId, "conn", event.ConnInfo, "packetType", packetTypeResponse) } } else { - requestBuffer = NewLargeBufferFrom(event.Buf[:]) + requestBuffer = largebuf.NewLargeBufferFrom(event.Buf[:]) } if parseCtx != nil && !parseCtx.payloadExtraction.Enabled() { @@ -230,7 +231,8 @@ func HTTPInfoEventToSpan(parseCtx *EBPFParseContext, event *BPFHTTPInfo) (reques } // http.ReadRequest requires a *bufio.Reader; that one allocation is unavoidable. - req, err := http.ReadRequest(bufio.NewReader(requestBuffer.NewReader())) + reqReader := requestBuffer.NewReader() + req, err := http.ReadRequest(bufio.NewReader(&reqReader)) resp, err2 := httpSafeParseResponse(responseBuffer, req) if err != nil || err2 != nil { slog.Debug("error while parsing http request or response, falling back to manual HTTP info parsing", "reqErr", err, "respErr", err2) @@ -243,19 +245,21 @@ func HTTPInfoEventToSpan(parseCtx *EBPFParseContext, event *BPFHTTPInfo) (reques // HTTP response buffers might have been sent incomplete, before the full body. // Try to parse the original buffer first, if an EOF is encountered, append an empty // body to the buffer and try again. -func httpSafeParseResponse(responseBuffer *LargeBuffer, req *http.Request) (*http.Response, error) { - rd := bufio.NewReader(responseBuffer.NewReader()) +func httpSafeParseResponse(responseBuffer *largebuf.LargeBuffer, req *http.Request) (*http.Response, error) { + r := responseBuffer.NewReader() + rd := bufio.NewReader(&r) resp, err := http.ReadResponse(rd, req) if err != nil && errors.Is(err, io.ErrUnexpectedEOF) { - // Append empty body terminator and retry with a fresh reader. + // Append empty body terminator and retry, reusing the same reader (preserves scratch). responseBuffer.AppendChunk([]byte("\r\n\r\n")) - rd.Reset(responseBuffer.NewReader()) + r.Reset() + rd.Reset(&r) return http.ReadResponse(rd, req) } return resp, nil } -func httpRequestToSpan(event *BPFHTTPInfo, requestBuffer *LargeBuffer) request.Span { +func httpRequestToSpan(event *BPFHTTPInfo, requestBuffer *largebuf.LargeBuffer) request.Span { var ( result = HTTPInfo{BPFHTTPInfo: *event} bufHost string diff --git a/pkg/ebpf/common/kafka_detect_transform.go b/pkg/ebpf/common/kafka_detect_transform.go index 6b2bd2047b..301bce452a 100644 --- a/pkg/ebpf/common/kafka_detect_transform.go +++ b/pkg/ebpf/common/kafka_detect_transform.go @@ -12,6 +12,7 @@ import ( "go.opentelemetry.io/obi/pkg/appolly/app" "go.opentelemetry.io/obi/pkg/appolly/app/request" "go.opentelemetry.io/obi/pkg/internal/ebpf/kafkaparser" + "go.opentelemetry.io/obi/pkg/internal/largebuf" ) type Operation int8 @@ -46,18 +47,11 @@ func (k Operation) String() string { // ProcessPossibleKafkaEvent processes a TCP packet and returns error if the packet is not a valid Kafka request. // Otherwise, return kafka.Info with the processed data. -func ProcessPossibleKafkaEvent(event *TCPRequestInfo, pkt *LargeBufferReader, rpkt *LargeBufferReader, kafkaTopicUUIDToName *simplelru.LRU[kafkaparser.UUID, string]) (*KafkaInfo, bool, error) { +func ProcessPossibleKafkaEvent(event *TCPRequestInfo, pkt *largebuf.LargeBuffer, rpkt *largebuf.LargeBuffer, kafkaTopicUUIDToName *simplelru.LRU[kafkaparser.UUID, string]) (*KafkaInfo, bool, error) { k, ok, err := ProcessKafkaEvent(pkt, rpkt, kafkaTopicUUIDToName) if err != nil { // If we are getting the information in the response buffer, the event // must be reversed and that's how we captured it. - // Reset readers before retrying with swapped buffers. - if pkt != nil { - pkt.Reset() - } - if rpkt != nil { - rpkt.Reset() - } k, ok, err = ProcessKafkaEvent(rpkt, pkt, kafkaTopicUUIDToName) if err == nil { reverseTCPEvent(event) @@ -66,16 +60,16 @@ func ProcessPossibleKafkaEvent(event *TCPRequestInfo, pkt *LargeBufferReader, rp return k, ok, err } -func ProcessKafkaEvent(pkt *LargeBufferReader, rpkt *LargeBufferReader, kafkaTopicUUIDToName *simplelru.LRU[kafkaparser.UUID, string]) (*KafkaInfo, bool, error) { - hdr, err := kafkaparser.ParseKafkaRequestHeader(pkt) +func ProcessKafkaEvent(pkt *largebuf.LargeBuffer, rpkt *largebuf.LargeBuffer, kafkaTopicUUIDToName *simplelru.LRU[kafkaparser.UUID, string]) (*KafkaInfo, bool, error) { + hdr, err := kafkaparser.NewKafkaRequestHeader(pkt) if err != nil { return nil, true, err } - switch hdr.APIKey { + switch hdr.APIKey() { case kafkaparser.APIKeyProduce: - return processProduceRequest(pkt, hdr) + return processProduceRequest(hdr) case kafkaparser.APIKeyFetch: - return processFetchRequest(pkt, hdr, kafkaTopicUUIDToName) + return processFetchRequest(hdr, kafkaTopicUUIDToName) case kafkaparser.APIKeyMetadata: return processMetadataResponse(rpkt, hdr, kafkaTopicUUIDToName) default: @@ -83,8 +77,13 @@ func ProcessKafkaEvent(pkt *LargeBufferReader, rpkt *LargeBufferReader, kafkaTop } } -func processProduceRequest(pkt *LargeBufferReader, hdr *kafkaparser.KafkaRequestHeader) (*KafkaInfo, bool, error) { - produceReq, err := kafkaparser.ParseProduceRequest(pkt, hdr) +func processProduceRequest(hdr kafkaparser.KafkaRequestHeader) (*KafkaInfo, bool, error) { + r, err := hdr.NewBodyReader() + if err != nil { + return nil, true, err + } + + produceReq, err := kafkaparser.ParseProduceRequest(&r, hdr) if err != nil { return nil, true, err } @@ -95,7 +94,7 @@ func processProduceRequest(pkt *LargeBufferReader, hdr *kafkaparser.KafkaRequest } } return &KafkaInfo{ - ClientID: hdr.ClientID, + ClientID: hdr.ClientID(), Operation: Produce, // TODO: handle multiple topics Topic: produceReq.Topics[0].Name, @@ -103,8 +102,13 @@ func processProduceRequest(pkt *LargeBufferReader, hdr *kafkaparser.KafkaRequest }, false, nil } -func processFetchRequest(pkt *LargeBufferReader, hdr *kafkaparser.KafkaRequestHeader, kafkaTopicUUIDToName *simplelru.LRU[kafkaparser.UUID, string]) (*KafkaInfo, bool, error) { - fetchReq, err := kafkaparser.ParseFetchRequest(pkt, hdr) +func processFetchRequest(hdr kafkaparser.KafkaRequestHeader, kafkaTopicUUIDToName *simplelru.LRU[kafkaparser.UUID, string]) (*KafkaInfo, bool, error) { + r, err := hdr.NewBodyReader() + if err != nil { + return nil, true, err + } + + fetchReq, err := kafkaparser.ParseFetchRequest(&r, hdr) if err != nil { return nil, true, err } @@ -126,7 +130,7 @@ func processFetchRequest(pkt *LargeBufferReader, hdr *kafkaparser.KafkaRequestHe } } return &KafkaInfo{ - ClientID: hdr.ClientID, + ClientID: hdr.ClientID(), Operation: Fetch, // TODO: handle multiple topics Topic: topicName, @@ -134,16 +138,17 @@ func processFetchRequest(pkt *LargeBufferReader, hdr *kafkaparser.KafkaRequestHe }, false, nil } -func processMetadataResponse(rpkt *LargeBufferReader, hdr *kafkaparser.KafkaRequestHeader, kafkaTopicUUIDToName *simplelru.LRU[kafkaparser.UUID, string]) (*KafkaInfo, bool, error) { +func processMetadataResponse(rpkt *largebuf.LargeBuffer, hdr kafkaparser.KafkaRequestHeader, kafkaTopicUUIDToName *simplelru.LRU[kafkaparser.UUID, string]) (*KafkaInfo, bool, error) { if rpkt == nil { return nil, true, errors.New("no response buffer for metadata request") } // only interested in response - _, err := kafkaparser.ParseKafkaResponseHeader(rpkt, hdr) + r := rpkt.NewReader() + _, err := kafkaparser.ParseKafkaResponseHeader(&r, hdr) if err != nil { return nil, true, err } - metadataResponse, err := kafkaparser.ParseMetadataResponse(rpkt, hdr) + metadataResponse, err := kafkaparser.ParseMetadataResponse(&r, hdr) if err != nil { return nil, true, err } @@ -153,16 +158,16 @@ func processMetadataResponse(rpkt *LargeBufferReader, hdr *kafkaparser.KafkaRequ return nil, true, nil } -func ProcessKafkaRequest(pkt *LargeBufferReader, kafkaTopicUUIDToName *simplelru.LRU[kafkaparser.UUID, string]) (*KafkaInfo, bool, error) { - hdr, err := kafkaparser.ParseKafkaRequestHeader(pkt) +func ProcessKafkaRequest(pkt *largebuf.LargeBuffer, kafkaTopicUUIDToName *simplelru.LRU[kafkaparser.UUID, string]) (*KafkaInfo, bool, error) { + hdr, err := kafkaparser.NewKafkaRequestHeader(pkt) if err != nil { return nil, true, err } - switch hdr.APIKey { + switch hdr.APIKey() { case kafkaparser.APIKeyProduce: - return processProduceRequest(pkt, hdr) + return processProduceRequest(hdr) case kafkaparser.APIKeyFetch: - return processFetchRequest(pkt, hdr, kafkaTopicUUIDToName) + return processFetchRequest(hdr, kafkaTopicUUIDToName) default: return nil, true, errors.New("unsupported Kafka API key") } diff --git a/pkg/ebpf/common/kafka_detect_transform_test.go b/pkg/ebpf/common/kafka_detect_transform_test.go index 5d2c186f14..c211670c8c 100644 --- a/pkg/ebpf/common/kafka_detect_transform_test.go +++ b/pkg/ebpf/common/kafka_detect_transform_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/obi/pkg/internal/ebpf/kafkaparser" + "go.opentelemetry.io/obi/pkg/internal/largebuf" ) func TestProcessKafkaRequest(t *testing.T) { @@ -90,7 +91,7 @@ func TestProcessKafkaRequest(t *testing.T) { ClientID string } */ - 0, 0, 0, 80, 0, 3, 0, 12, 2, 0, 0, 0, 0, 0, + 0, 0, 0, 80, 0, 3, 0, 12, 2, 0, 0, 0, 0, 0, 0, }, response: []byte{ // Header @@ -172,12 +173,12 @@ func TestProcessKafkaRequest(t *testing.T) { cache, _ := simplelru.NewLRU[kafkaparser.UUID, string](1000, nil) if len(tt.preRequests) > 0 { for _, preInput := range tt.preRequests { - _, ignore, err := ProcessKafkaEvent(NewLargeBufferFrom(preInput.request).NewReader(), NewLargeBufferFrom(preInput.response).NewReader(), cache) + _, ignore, err := ProcessKafkaEvent(largebuf.NewLargeBufferFrom(preInput.request), largebuf.NewLargeBufferFrom(preInput.response), cache) require.NoError(t, err) require.True(t, ignore) } } - res, _, err := ProcessKafkaEvent(NewLargeBufferFrom(tt.request).NewReader(), nil, cache) + res, _, err := ProcessKafkaEvent(largebuf.NewLargeBufferFrom(tt.request), nil, cache) if tt.err { assert.Error(t, err) return diff --git a/pkg/ebpf/common/mongo_detect_transform.go b/pkg/ebpf/common/mongo_detect_transform.go index 0da36d3ea3..2d07356ce1 100644 --- a/pkg/ebpf/common/mongo_detect_transform.go +++ b/pkg/ebpf/common/mongo_detect_transform.go @@ -18,6 +18,7 @@ import ( "go.opentelemetry.io/obi/pkg/appolly/app" "go.opentelemetry.io/obi/pkg/appolly/app/request" "go.opentelemetry.io/obi/pkg/internal/ebpf/ringbuf" + "go.opentelemetry.io/obi/pkg/internal/largebuf" ) type mongoSpanInfo struct { @@ -359,7 +360,7 @@ func validateFlagBits(flagBits int32) error { return nil } -func mongoInfoFromEvent(event *TCPRequestInfo, requestBuffer *LargeBuffer, responseBuffer *LargeBuffer, mongoRequestCache PendingMongoDBRequests) *mongoSpanInfo { +func mongoInfoFromEvent(event *TCPRequestInfo, requestBuffer *largebuf.LargeBuffer, responseBuffer *largebuf.LargeBuffer, mongoRequestCache PendingMongoDBRequests) *mongoSpanInfo { if event.Direction == 0 { return nil } diff --git a/pkg/ebpf/common/mqtt_detect_transform.go b/pkg/ebpf/common/mqtt_detect_transform.go index 418b27d316..074e7e51f6 100644 --- a/pkg/ebpf/common/mqtt_detect_transform.go +++ b/pkg/ebpf/common/mqtt_detect_transform.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/obi/pkg/appolly/app" "go.opentelemetry.io/obi/pkg/appolly/app/request" "go.opentelemetry.io/obi/pkg/internal/ebpf/mqttparser" + "go.opentelemetry.io/obi/pkg/internal/largebuf" ) // MQTTInfo holds parsed information from an MQTT packet. @@ -46,7 +47,7 @@ func packetTypeToMethod(packetType mqttparser.PacketType) string { // ProcessPossibleMQTTEvent processes a TCP packet and returns error if the packet is not a valid MQTT packet. // Otherwise, returns MQTTInfo with the processed data. The ignore bool indicates whether the event // should be ignored for span creation (e.g., control packets like CONNECT). -func ProcessPossibleMQTTEvent(event *TCPRequestInfo, pkt *LargeBuffer, rpkt *LargeBuffer) (*MQTTInfo, bool, error) { +func ProcessPossibleMQTTEvent(event *TCPRequestInfo, pkt *largebuf.LargeBuffer, rpkt *largebuf.LargeBuffer) (*MQTTInfo, bool, error) { m, ignore, err := ProcessMQTTEvent(pkt.UnsafeView()) if err != nil { // If we are getting the information in the response buffer, the event @@ -177,7 +178,7 @@ func processConnectPacket(pkt []byte, offset int) (*MQTTInfo, bool, error) { // isMQTT performs a quick heuristic check to determine if the packet looks like MQTT. // This is used for userspace protocol detection when the kernel hasn't classified the protocol. -func isMQTT(pkt *LargeBuffer) bool { +func isMQTT(pkt *largebuf.LargeBuffer) bool { if pkt == nil { return false } diff --git a/pkg/ebpf/common/mqtt_detect_transform_test.go b/pkg/ebpf/common/mqtt_detect_transform_test.go index fa18bf7d1f..f9af8a0ca3 100644 --- a/pkg/ebpf/common/mqtt_detect_transform_test.go +++ b/pkg/ebpf/common/mqtt_detect_transform_test.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/obi/pkg/appolly/app/request" "go.opentelemetry.io/obi/pkg/internal/ebpf/mqttparser" + "go.opentelemetry.io/obi/pkg/internal/largebuf" ) func TestProcessMQTTEvent(t *testing.T) { @@ -295,7 +296,7 @@ func TestProcessPossibleMQTTEvent(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { event := &TCPRequestInfo{} - res, _, err := ProcessPossibleMQTTEvent(event, NewLargeBufferFrom(tt.request), NewLargeBufferFrom(tt.response)) + res, _, err := ProcessPossibleMQTTEvent(event, largebuf.NewLargeBufferFrom(tt.request), largebuf.NewLargeBufferFrom(tt.response)) if tt.err { assert.Error(t, err) return @@ -429,7 +430,7 @@ func TestIsMQTT(t *testing.T) { validPacket := []byte{0xC0, 0x00} // PINGREQ - minimal valid MQTT packet invalidPacket := []byte{0x00, 0x00} // Reserved packet type (invalid) - assert.True(t, isMQTT(NewLargeBufferFrom(validPacket)), "valid MQTT packet should return true") - assert.False(t, isMQTT(NewLargeBufferFrom(invalidPacket)), "invalid packet should return false") + assert.True(t, isMQTT(largebuf.NewLargeBufferFrom(validPacket)), "valid MQTT packet should return true") + assert.False(t, isMQTT(largebuf.NewLargeBufferFrom(invalidPacket)), "invalid packet should return false") assert.False(t, isMQTT(nil), "nil packet should return false") } diff --git a/pkg/ebpf/common/redis_detect_transform.go b/pkg/ebpf/common/redis_detect_transform.go index c5f877e970..8a420dbfc2 100644 --- a/pkg/ebpf/common/redis_detect_transform.go +++ b/pkg/ebpf/common/redis_detect_transform.go @@ -16,6 +16,7 @@ import ( "go.opentelemetry.io/obi/pkg/appolly/app" "go.opentelemetry.io/obi/pkg/appolly/app/request" "go.opentelemetry.io/obi/pkg/internal/ebpf/ringbuf" + "go.opentelemetry.io/obi/pkg/internal/largebuf" "go.opentelemetry.io/obi/pkg/internal/split" ) @@ -32,7 +33,7 @@ var redisErrorCodes = [...]string{ "READONLY ", } -func isRedis(buf *LargeBuffer) bool { +func isRedis(buf *largebuf.LargeBuffer) bool { if buf.Len() < minRedisFrameLen { return false } @@ -182,7 +183,7 @@ func parseRedisRequest(buf string) (string, string, bool) { return op, strings.TrimSpace(text.String()), true } -func redisStatus(buf *LargeBuffer) (request.DBError, int) { +func redisStatus(buf *largebuf.LargeBuffer) (request.DBError, int) { if buf.Len() == 0 { return request.DBError{}, 0 } diff --git a/pkg/ebpf/common/redis_detect_transform_test.go b/pkg/ebpf/common/redis_detect_transform_test.go index cba6effbd7..f2a9e8de40 100644 --- a/pkg/ebpf/common/redis_detect_transform_test.go +++ b/pkg/ebpf/common/redis_detect_transform_test.go @@ -9,6 +9,8 @@ import ( "github.com/hashicorp/golang-lru/v2/simplelru" "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/obi/pkg/internal/largebuf" ) type crlfTest struct { @@ -75,8 +77,8 @@ func TestRedisParsing(t *testing.T) { func TestIsRedis(t *testing.T) { buf := []byte{42, 51, 13, 10, 36, 52, 13, 10, 72, 71, 69, 84, 13, 10, 36, 51, 54, 13, 10, 56, 97, 100, 48, 101, 56, 99, 97, 45, 101, 97, 49, 57, 45, 52, 50, 97, 57, 45, 98, 51, 55, 48, 45, 98, 99, 97, 102, 102, 50, 55, 54, 55, 98, 56, 54, 13, 10, 36, 52, 13, 10, 99, 97, 114, 116, 13, 10, 103, 58, 32, 34, 51, 49, 117, 50, 107, 97, 100, 98, 108, 113, 53, 106, 34, 13, 10, 99, 111, 110, 116, 101, 110, 116, 45, 108, 101, 110, 103, 116, 104, 58, 32, 49, 57, 57, 13, 10, 118, 97, 114, 121, 58, 32, 65, 99, 99, 101, 112, 116, 45, 69, 110, 99, 111, 100, 105, 110, 103, 13, 10, 100, 97, 116, 101, 58, 32, 87, 101, 100, 44, 32, 48, 51, 32, 74, 117, 108, 32, 50, 48, 50, 52, 32, 49, 55, 58, 52, 54, 58, 49, 55, 32, 71, 77, 84, 13, 10, 120, 45, 101, 110, 118, 111, 121, 45, 117, 112, 115, 116, 114, 101, 97, 109, 45, 115, 101, 114, 118, 105, 99, 101, 45, 116, 105, 109, 101, 58, 32, 51, 13, 10, 115, 101, 114, 118, 101, 114, 58, 32, 101, 110, 118, 111, 121, 13, 10, 13, 10, 91, 34, 90, 65, 82, 34, 44, 34, 73, 83, 75, 34, 44, 34, 73, 76, 83, 34, 44, 34, 82, 79, 78, 34, 44, 34, 71, 66, 80, 34, 44, 34, 66, 82, 76, 34, 44, 34} rbuf := []byte{36, 45, 49, 13, 10, 1, 0, 15, 0, 3, 89, 130, 0, 32, 99, 111, 110, 115, 117, 109, 101, 114, 45, 102, 114, 97, 117, 100, 100, 101, 116, 101, 99, 116, 105, 111, 110, 115, 101, 114, 118, 105, 99, 101, 45, 49, 0, 0, 0, 1, 244, 0, 0, 0, 1, 3, 32, 0, 0, 0, 17, 170, 173, 222, 0, 0, 141, 2, 1, 1, 1, 0, 101, 112, 116, 45, 114, 97, 110, 103, 101, 115, 58, 32, 98, 121, 116, 101, 115, 13, 10, 108, 97, 115, 116, 45, 109, 111, 100, 105, 102, 105, 101, 100, 58, 32, 70, 114, 105, 44, 32, 48, 55, 32, 74, 117, 110, 32, 50, 48, 50, 52, 32, 48, 48, 58, 53, 55} - assert.True(t, isRedis(NewLargeBufferFrom(buf))) - assert.True(t, isRedis(NewLargeBufferFrom(rbuf))) + assert.True(t, isRedis(largebuf.NewLargeBufferFrom(buf))) + assert.True(t, isRedis(largebuf.NewLargeBufferFrom(rbuf))) } func TestGetRedisDb(t *testing.T) { diff --git a/pkg/ebpf/common/sql_detect_mysql.go b/pkg/ebpf/common/sql_detect_mysql.go index d5199f299e..4cdeabd643 100644 --- a/pkg/ebpf/common/sql_detect_mysql.go +++ b/pkg/ebpf/common/sql_detect_mysql.go @@ -9,6 +9,7 @@ import ( "strings" "go.opentelemetry.io/obi/pkg/appolly/app/request" + "go.opentelemetry.io/obi/pkg/internal/largebuf" "go.opentelemetry.io/obi/pkg/internal/sqlprune" ) @@ -84,7 +85,7 @@ func mysqlPreparedStatements(b []byte) (string, string, string) { return op, table, sql } -func handleMySQL(parseCtx *EBPFParseContext, event *TCPRequestInfo, requestBuffer, responseBuffer *LargeBuffer) (request.Span, error) { +func handleMySQL(parseCtx *EBPFParseContext, event *TCPRequestInfo, requestBuffer, responseBuffer *largebuf.LargeBuffer) (request.Span, error) { var ( op, table, stmt string span request.Span diff --git a/pkg/ebpf/common/sql_detect_postgres.go b/pkg/ebpf/common/sql_detect_postgres.go index 7f2d1767c6..b024fe45d9 100644 --- a/pkg/ebpf/common/sql_detect_postgres.go +++ b/pkg/ebpf/common/sql_detect_postgres.go @@ -13,6 +13,7 @@ import ( "golang.org/x/sys/unix" "go.opentelemetry.io/obi/pkg/appolly/app/request" + "go.opentelemetry.io/obi/pkg/internal/largebuf" "go.opentelemetry.io/obi/pkg/internal/sqlprune" ) @@ -192,7 +193,7 @@ type postgresMessage struct { } type postgresMessageIterator struct { - r *LargeBufferReader + r *largebuf.LargeBufferReader err error eof bool } @@ -248,7 +249,7 @@ func (it *postgresMessageIterator) next() (msg postgresMessage) { return } -func handlePostgres(parseCtx *EBPFParseContext, event *TCPRequestInfo, requestBuffer, responseBuffer *LargeBufferReader) (request.Span, error) { +func handlePostgres(parseCtx *EBPFParseContext, event *TCPRequestInfo, requestBuffer, responseBuffer *largebuf.LargeBufferReader) (request.Span, error) { var ( hasSpan bool op, table, stmt string diff --git a/pkg/ebpf/common/sql_detect_postgres_test.go b/pkg/ebpf/common/sql_detect_postgres_test.go index 57f5aded73..b8578e5847 100644 --- a/pkg/ebpf/common/sql_detect_postgres_test.go +++ b/pkg/ebpf/common/sql_detect_postgres_test.go @@ -8,6 +8,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "go.opentelemetry.io/obi/pkg/internal/largebuf" ) func TestPostgresMessagesIterator(t *testing.T) { @@ -82,7 +84,8 @@ func TestPostgresMessagesIterator(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { var got []postgresMessage - it := &postgresMessageIterator{r: NewLargeBufferFrom(tt.buf).NewReader()} + rr := largebuf.NewLargeBufferFrom(tt.buf).NewReader() + it := &postgresMessageIterator{r: &rr} for { msg := it.next() if it.isEOF() { @@ -112,11 +115,11 @@ func TestPostgresMessagesIteratorNoAllocs(t *testing.T) { return b }() - lb := NewLargeBufferFrom(buf) + lb := largebuf.NewLargeBufferFrom(buf) r := lb.NewReader() allocs := testing.AllocsPerRun(1000, func() { r.Reset() - it := postgresMessageIterator{r: r} + it := postgresMessageIterator{r: &r} for { it.next() diff --git a/pkg/ebpf/common/sql_detect_transform.go b/pkg/ebpf/common/sql_detect_transform.go index 1148028c82..bd74062831 100644 --- a/pkg/ebpf/common/sql_detect_transform.go +++ b/pkg/ebpf/common/sql_detect_transform.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/obi/pkg/appolly/app" "go.opentelemetry.io/obi/pkg/appolly/app/request" + "go.opentelemetry.io/obi/pkg/internal/largebuf" "go.opentelemetry.io/obi/pkg/internal/sqlprune" ) @@ -58,7 +59,7 @@ func isASCII(s string) bool { return true } -func detectSQLPayload(useHeuristics bool, b *LargeBuffer) (string, string, string, request.SQLKind) { +func detectSQLPayload(useHeuristics bool, b *largebuf.LargeBuffer) (string, string, string, request.SQLKind) { raw := b.UnsafeView() sqlKind := sqlKind(raw) if !useHeuristics { diff --git a/pkg/ebpf/common/sql_detect_transform_test.go b/pkg/ebpf/common/sql_detect_transform_test.go index 1884005aef..4f913e344d 100644 --- a/pkg/ebpf/common/sql_detect_transform_test.go +++ b/pkg/ebpf/common/sql_detect_transform_test.go @@ -8,6 +8,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "go.opentelemetry.io/obi/pkg/internal/largebuf" ) type bindParseResult struct { @@ -185,12 +187,12 @@ func TestPostgresQueryParsing(t *testing.T) { }, } { t.Run(ts.name, func(t *testing.T) { - op, table, sql, _ := detectSQLPayload(false, NewLargeBufferFrom(ts.bytes)) + op, table, sql, _ := detectSQLPayload(false, largebuf.NewLargeBufferFrom(ts.bytes)) assert.Equal(t, ts.op, op) assert.Equal(t, ts.table, table) assert.Equal(t, ts.sql, sql) - op, table, sql, _ = detectSQLPayload(true, NewLargeBufferFrom(ts.bytes)) + op, table, sql, _ = detectSQLPayload(true, largebuf.NewLargeBufferFrom(ts.bytes)) assert.Equal(t, ts.op, op) assert.Equal(t, ts.table, table) assert.Equal(t, ts.sql, sql) diff --git a/pkg/ebpf/common/tcp_detect_transform.go b/pkg/ebpf/common/tcp_detect_transform.go index c6849639f2..8bba961586 100644 --- a/pkg/ebpf/common/tcp_detect_transform.go +++ b/pkg/ebpf/common/tcp_detect_transform.go @@ -12,6 +12,7 @@ import ( "go.opentelemetry.io/obi/pkg/appolly/app/request" "go.opentelemetry.io/obi/pkg/config" "go.opentelemetry.io/obi/pkg/internal/ebpf/ringbuf" + "go.opentelemetry.io/obi/pkg/internal/largebuf" ) var ( @@ -50,7 +51,7 @@ func ReadTCPRequestIntoSpan(parseCtx *EBPFParseContext, cfg *config.EBPFTracer, // We might know already the protocol for this event switch event.ProtocolType { case ProtocolTypeKafka: - k, ignore, err := ProcessPossibleKafkaEvent(event, requestBuffer.NewReader(), responseBuffer.NewReader(), parseCtx.kafkaTopicUUIDToName) + k, ignore, err := ProcessPossibleKafkaEvent(event, requestBuffer, responseBuffer, parseCtx.kafkaTopicUUIDToName) if ignore && err == nil { return request.Span{}, true, nil // parsed kafka event, but we don't want to create a span for it } @@ -82,7 +83,8 @@ func ReadTCPRequestIntoSpan(parseCtx *EBPFParseContext, cfg *config.EBPFTracer, return span, false, nil case ProtocolTypePostgres: - span, err := handlePostgres(parseCtx, event, requestBuffer.NewReader(), responseBuffer.NewReader()) + reqR, respR := requestBuffer.NewReader(), responseBuffer.NewReader() + span, err := handlePostgres(parseCtx, event, &reqR, &respR) if errors.Is(err, errFallback) { slog.Debug("Postgres: falling back to generic handler") break @@ -179,7 +181,7 @@ func ReadTCPRequestIntoSpan(parseCtx *EBPFParseContext, cfg *config.EBPFTracer, return request.Span{}, true, nil // ignore for now, next event will be parsed } else { // we should not arrive here, leave it for completeness - k, ignore, err := ProcessPossibleKafkaEvent(event, requestBuffer.NewReader(), responseBuffer.NewReader(), parseCtx.kafkaTopicUUIDToName) + k, ignore, err := ProcessPossibleKafkaEvent(event, requestBuffer, responseBuffer, parseCtx.kafkaTopicUUIDToName) if ignore && err == nil { return request.Span{}, true, nil // parsed kafka event, but we don't want to create a span for it } @@ -197,18 +199,18 @@ func ReadTCPRequestIntoSpan(parseCtx *EBPFParseContext, cfg *config.EBPFTracer, return request.Span{}, true, nil // ignore if we couldn't parse it } -func getBuffers(parseCtx *EBPFParseContext, event *TCPRequestInfo) (req *LargeBuffer, resp *LargeBuffer) { +func getBuffers(parseCtx *EBPFParseContext, event *TCPRequestInfo) (req *largebuf.LargeBuffer, resp *largebuf.LargeBuffer) { l := int(event.Len) if l < 0 || len(event.Buf) < l { l = len(event.Buf) } - req = NewLargeBufferFrom(event.Buf[:l]) + req = largebuf.NewLargeBufferFrom(event.Buf[:l]) l = int(event.RespLen) if l < 0 || len(event.Rbuf) < l { l = len(event.Rbuf) } - resp = NewLargeBufferFrom(event.Rbuf[:l]) + resp = largebuf.NewLargeBufferFrom(event.Rbuf[:l]) if event.HasLargeBuffers == 1 { if b, ok := extractTCPLargeBuffer(parseCtx, event.Tp.TraceId, packetTypeRequest, directionByPacketType(packetTypeRequest, !event.IsServer), event.ConnInfo); ok { diff --git a/pkg/ebpf/common/tcp_detect_transform_test.go b/pkg/ebpf/common/tcp_detect_transform_test.go index da17cdfd2e..8af2ed8501 100644 --- a/pkg/ebpf/common/tcp_detect_transform_test.go +++ b/pkg/ebpf/common/tcp_detect_transform_test.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/obi/pkg/appolly/app/svc" "go.opentelemetry.io/obi/pkg/config" "go.opentelemetry.io/obi/pkg/internal/ebpf/ringbuf" + "go.opentelemetry.io/obi/pkg/internal/largebuf" ) func TestTCPReqSQLParsing(t *testing.T) { @@ -168,7 +169,7 @@ func TestRedisDetection(t *testing.T) { } { lines := strings.Split(s, "|") test := strings.Join(lines, "\r\n") - assert.True(t, isRedis(NewLargeBufferFrom([]uint8(test)))) + assert.True(t, isRedis(largebuf.NewLargeBufferFrom([]uint8(test)))) assert.True(t, isRedisOp([]uint8(test))) } @@ -182,7 +183,7 @@ func TestRedisDetection(t *testing.T) { } { lines := strings.Split(s, "|") test := strings.Join(lines, "\r\n") - assert.False(t, isRedis(NewLargeBufferFrom([]uint8(test)))) + assert.False(t, isRedis(largebuf.NewLargeBufferFrom([]uint8(test)))) assert.False(t, isRedisOp([]uint8(test))) } } @@ -191,7 +192,7 @@ func TestTCPReqKafkaParsing(t *testing.T) { // kafka message b := []byte{0, 0, 0, 94, 0, 1, 0, 11, 0, 0, 0, 224, 0, 6, 115, 97, 114, 97, 109, 97, 255, 255, 255, 255, 0, 0, 1, 244, 0, 0, 0, 1, 6, 64, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255, 255, 0, 0, 0, 1, 0, 9, 105, 109, 112, 111, 114, 116, 97, 110, 116, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 19, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16, 0, 0, 0, 0, 0, 0, 0, 0} r := makeTCPReq(string(b), 343534) - k, _, err := ProcessKafkaRequest(NewLargeBufferFrom(b).NewReader(), nil) + k, _, err := ProcessKafkaRequest(largebuf.NewLargeBufferFrom(b), nil) require.NoError(t, err) s := TCPToKafkaToSpan(&r, k) assert.NotNil(t, s) @@ -240,7 +241,7 @@ func TestTCPReqMQTTHeuristicFailure(t *testing.T) { } // Verify the heuristic passes but full parsing fails - assert.True(t, isMQTT(NewLargeBufferFrom(b)), "packet should pass isMQTT heuristic") + assert.True(t, isMQTT(largebuf.NewLargeBufferFrom(b)), "packet should pass isMQTT heuristic") _, _, err := ProcessMQTTEvent(b) require.Error(t, err, "full MQTT parsing should fail") diff --git a/pkg/ebpf/common/tcp_large_buffer.go b/pkg/ebpf/common/tcp_large_buffer.go index 5cc095e52d..ff272da4a3 100644 --- a/pkg/ebpf/common/tcp_large_buffer.go +++ b/pkg/ebpf/common/tcp_large_buffer.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/obi/pkg/appolly/app/request" "go.opentelemetry.io/obi/pkg/internal/ebpf/ringbuf" + "go.opentelemetry.io/obi/pkg/internal/largebuf" ) type largeBufferKey struct { @@ -47,7 +48,7 @@ func appendTCPLargeBuffer(parseCtx *EBPFParseContext, record *ringbuf.Record) (r switch event.Action { case largeBufferActionInit: - lb := NewLargeBuffer() + lb := largebuf.NewLargeBuffer() lb.AppendChunk(chunk) parseCtx.largeBuffers.Add(key, lb) @@ -70,7 +71,7 @@ func extractTCPLargeBuffer( traceID [16]uint8, packetType, direction uint8, connInfo BpfConnectionInfoT, -) (*LargeBuffer, bool) { +) (*largebuf.LargeBuffer, bool) { key := largeBufferKey{ traceID: traceID, packetType: packetType, diff --git a/pkg/internal/ebpf/kafkaparser/common.go b/pkg/internal/ebpf/kafkaparser/common.go index 443294facc..57ff69770a 100644 --- a/pkg/internal/ebpf/kafkaparser/common.go +++ b/pkg/internal/ebpf/kafkaparser/common.go @@ -6,6 +6,8 @@ package kafkaparser // import "go.opentelemetry.io/obi/pkg/internal/ebpf/kafkapa import ( "encoding/binary" "errors" + + "go.opentelemetry.io/obi/pkg/internal/largebuf" ) const ( @@ -35,88 +37,102 @@ const ( type UUID [UUIDLen]byte +// KafkaRequestHeader is a zero-copy view over a *largebuf.LargeBuffer. +// Fixed-width fields are read on demand via scalar accessors; ClientID is +// read on demand from offset 14 using the stored length. type KafkaRequestHeader struct { - MessageSize int32 - APIKey KafkaAPIKey - APIVersion int16 - CorrelationID int32 - ClientID string + lb *largebuf.LargeBuffer + bodyOffset int32 // absolute offset of the first request-body byte + clientIDLen int16 // ≥ 0 guaranteed after successful construction } -type KafkaResponseHeader struct { - MessageSize int32 - CorrelationID int32 +func (h KafkaRequestHeader) MessageSize() int32 { + v, _ := h.lb.I32BEAt(0) + return v } -// byteReader is the sequential-read interface satisfied by *LargeBuffer. -// Defined here so sub-packages don't need to import ebpfcommon (which would be circular). -type byteReader interface { - ReadN(n int) ([]byte, error) - Peek(n int) ([]byte, error) - Skip(n int) error - Remaining() int +func (h KafkaRequestHeader) APIKey() KafkaAPIKey { + v, _ := h.lb.I16BEAt(4) + return KafkaAPIKey(v) } -func ParseKafkaRequestHeader(r byteReader) (*KafkaRequestHeader, error) { - if r.Remaining() < MinKafkaRequestLen { - return nil, errors.New("packet too short for Kafka request header") - } +func (h KafkaRequestHeader) APIVersion() int16 { + v, _ := h.lb.I16BEAt(6) + return v +} - msgSizeBytes, err := r.ReadN(Int32Len) - if err != nil { - return nil, err +func (h KafkaRequestHeader) CorrelationID() int32 { + v, _ := h.lb.I32BEAt(8) + return v +} + +// ClientID reads the client ID on demand from the underlying buffer. +// The Kafka wire format stores the length as an INT16 at offset 12, +// followed by N UTF-8 bytes starting at offset 14. +func (h KafkaRequestHeader) ClientID() string { + if h.clientIDLen == 0 { + return "" } - apiKeyBytes, err := r.ReadN(Int16Len) + b, _ := h.lb.UnsafeViewAt(14, int(h.clientIDLen)) + return string(b) +} + +func (h KafkaRequestHeader) NewBodyReader() (largebuf.LargeBufferReader, error) { + r := h.lb.NewReader() + + err := r.Skip(int(h.bodyOffset)) if err != nil { - return nil, err + return largebuf.LargeBufferReader{}, err } - apiVersionBytes, err := r.ReadN(Int16Len) - if err != nil { - return nil, err - } - correlationIDBytes, err := r.ReadN(Int32Len) - if err != nil { - return nil, err + + return r, nil +} + +func NewKafkaRequestHeader(lb *largebuf.LargeBuffer) (KafkaRequestHeader, error) { + if lb.Len() < MinKafkaRequestLen { + return KafkaRequestHeader{}, errors.New("packet too short for Kafka request header") } - header := &KafkaRequestHeader{ - MessageSize: int32(binary.BigEndian.Uint32(msgSizeBytes)), - APIKey: KafkaAPIKey(int16(binary.BigEndian.Uint16(apiKeyBytes))), - APIVersion: int16(binary.BigEndian.Uint16(apiVersionBytes)), - CorrelationID: int32(binary.BigEndian.Uint32(correlationIDBytes)), + + h := KafkaRequestHeader{lb: lb} + + if err := h.validate(); err != nil { + return KafkaRequestHeader{}, err } - clientIDSizeBytes, err := r.ReadN(Int16Len) + // ClientID: length at offset 12 (INT16), data at offset 14. + clientIDLen, err := lb.I16BEAt(12) if err != nil { - return nil, err + return KafkaRequestHeader{}, err } - clientIDSize := int16(binary.BigEndian.Uint16(clientIDSizeBytes)) - if err := validateKafkaRequestHeader(header); err != nil { - return nil, err - } - if clientIDSize < 0 { - return nil, errors.New("invalid client ID size") + if clientIDLen < 0 { + return KafkaRequestHeader{}, errors.New("invalid client ID size") } - if clientIDSize == 0 { - header.ClientID = "" - return header, nil - } - if r.Remaining() < int(clientIDSize) { - return nil, errors.New("packet too short for client ID") - } - clientIDBytes, err := r.ReadN(int(clientIDSize)) - if err != nil { - return nil, err + + clientIDEnd := 14 + int(clientIDLen) + if lb.Len() < clientIDEnd { + return KafkaRequestHeader{}, errors.New("packet too short for client ID") } - header.ClientID = string(clientIDBytes) - if err := skipTaggedFields(r, header); err != nil { - return nil, err + h.clientIDLen = clientIDLen + + bodyOff := clientIDEnd + if isFlexible(h) { + bodyOff, err = skipTaggedFieldsAt(lb, clientIDEnd) + if err != nil { + return KafkaRequestHeader{}, err + } } - return header, nil + h.bodyOffset = int32(bodyOff) + return h, nil +} + +type KafkaResponseHeader struct { + MessageSize int32 + CorrelationID int32 } -func ParseKafkaResponseHeader(r byteReader, requestHeader *KafkaRequestHeader) (*KafkaResponseHeader, error) { +func ParseKafkaResponseHeader(r *largebuf.LargeBufferReader, requestHeader KafkaRequestHeader) (*KafkaResponseHeader, error) { if r.Remaining() < MinKafkaResponseLen { return nil, errors.New("packet too short for Kafka response header") } @@ -142,7 +158,7 @@ func ParseKafkaResponseHeader(r byteReader, requestHeader *KafkaRequestHeader) ( return header, nil } -func skipTaggedFields(r byteReader, header *KafkaRequestHeader) error { +func skipTaggedFields(r *largebuf.LargeBufferReader, header KafkaRequestHeader) error { if !isFlexible(header) { return nil // no tagged fields to skip for non-flexible versions } @@ -165,38 +181,90 @@ func skipTaggedFields(r byteReader, header *KafkaRequestHeader) error { return nil } -func validateKafkaRequestHeader(header *KafkaRequestHeader) error { - if header.MessageSize < int32(MinKafkaRequestLen) || header.APIVersion < 0 { - return errors.New("invalid Kafka request header: size or version is negative") +// skipTaggedFieldsAt skips flexible-version tagged fields starting at absolute +// offset off in lb, returning the new absolute offset after all tagged fields. +func skipTaggedFieldsAt(lb *largebuf.LargeBuffer, off int) (int, error) { + count, n, err := readUVarintAt(lb, off) + if err != nil { + return 0, err + } + off += n + for range count { + _, n, err = readUVarintAt(lb, off) // tag ID + if err != nil { + return 0, err + } + off += n + tagLen, n, err := readUVarintAt(lb, off) // tag length + if err != nil { + return 0, err + } + if tagLen < 0 || tagLen > lb.Len()-off-n { + return 0, errors.New("tagged field value exceeds buffer") + } + off += n + tagLen + } + return off, nil +} + +// readUVarintAt reads an unsigned varint from lb at absolute offset off. +// Returns value, bytes consumed, and any error. +func readUVarintAt(lb *largebuf.LargeBuffer, off int) (int, int, error) { + value, shift, n := 0, 0, 0 + for { + b, err := lb.U8At(off + n) + if err != nil { + return 0, 0, errors.New("data ended before varint was complete") + } + n++ + if b&0x80 == 0 { + value |= int(b) << shift + return value, n, nil + } + value |= int(b&0x7F) << shift + shift += 7 + if shift > 28 { + return 0, 0, errors.New("illegal varint") + } + } +} + +func (h KafkaRequestHeader) validate() error { + if h.MessageSize() < int32(MinKafkaRequestLen) { + return errors.New("invalid Kafka request header: message size too small") } - if header.MessageSize > KafkaMaxPayloadLen { + if h.APIVersion() < 0 { + return errors.New("invalid Kafka request header: API version is negative") + } + + if h.MessageSize() > KafkaMaxPayloadLen { return errors.New("invalid Kafka request header: message size exceeds maximum payload length") } - switch header.APIKey { + switch h.APIKey() { case APIKeyFetch: - if header.APIVersion > 18 { // latest: Fetch Request (Version: 17) + if h.APIVersion() > 18 { // latest: Fetch Request (Version: 17) return errors.New("invalid Kafka request header: unsupported API key version for Fetch") } case APIKeyProduce: - if header.APIVersion > 13 { // latest: Produce Request (Version: 12) + if h.APIVersion() > 13 { // latest: Produce Request (Version: 12) return errors.New("invalid Kafka request header: unsupported API key version for Produce") } case APIKeyMetadata: - if header.APIVersion < 10 || header.APIVersion > 13 { // latest: Metadata Request (Version: 13), only versions 10-13 contain topic_id which we are interested in + if h.APIVersion() < 10 || h.APIVersion() > 13 { // latest: Metadata Request (Version: 13), only versions 10-13 contain topic_id which we are interested in return errors.New("invalid Kafka request header: unsupported API key version for Metadata") } default: return errors.New("invalid Kafka request header: unsupported API key") } - if header.CorrelationID < 0 { + if h.CorrelationID() < 0 { return errors.New("invalid Kafka request header: correlation ID is negative") } return nil } -func validateKafkaResponseHeader(header *KafkaResponseHeader, requestHeader *KafkaRequestHeader) error { +func validateKafkaResponseHeader(header *KafkaResponseHeader, requestHeader KafkaRequestHeader) error { if header.MessageSize < MinKafkaResponseLen { return errors.New("invalid Kafka response header: size too small") } @@ -208,7 +276,7 @@ func validateKafkaResponseHeader(header *KafkaResponseHeader, requestHeader *Kaf if header.CorrelationID < 0 { return errors.New("invalid Kafka response header: correlation ID is negative") } - if header.CorrelationID != requestHeader.CorrelationID { + if header.CorrelationID != requestHeader.CorrelationID() { return errors.New("invalid Kafka response header: correlation ID does not match request header") } return nil @@ -216,23 +284,24 @@ func validateKafkaResponseHeader(header *KafkaResponseHeader, requestHeader *Kaf // isFlexible checks for each API key if the version is flexible. // a flexible version uses a dynamic size for arrays and strings -func isFlexible(header *KafkaRequestHeader) bool { - switch header.APIKey { +func isFlexible(header KafkaRequestHeader) bool { + ver := header.APIVersion() + switch header.APIKey() { // https://github.com/apache/kafka/blob/9983331d917fe8f57c37c88f0749b757e5af0c87/clients/src/main/resources/common/message/ProduceRequest.json#L51 case APIKeyProduce: - return header.APIVersion >= 9 + return ver >= 9 // https://github.com/apache/kafka/blob/9983331d917fe8f57c37c88f0749b757e5af0c87/clients/src/main/resources/common/message/FetchRequest.json#L62C4-L62C20 case APIKeyFetch: - return header.APIVersion >= 12 + return ver >= 12 // https://github.com/apache/kafka/blob/9983331d917fe8f57c37c88f0749b757e5af0c87/clients/src/main/resources/common/message/MetadataRequest.json#L22 case APIKeyMetadata: - return header.APIVersion >= 9 + return ver >= 9 default: return false } } -func readArrayLength(r byteReader, header *KafkaRequestHeader) (int, error) { +func readArrayLength(r *largebuf.LargeBufferReader, header KafkaRequestHeader) (int, error) { if isFlexible(header) { size, err := readUnsignedVarint(r) if err != nil { @@ -246,7 +315,7 @@ func readArrayLength(r byteReader, header *KafkaRequestHeader) (int, error) { return readInt32(r) } -func readUUID(r byteReader) (*UUID, error) { +func readUUID(r *largebuf.LargeBufferReader) (*UUID, error) { b, err := r.ReadN(UUIDLen) if err != nil { return nil, errors.New("packet too short for topic UUID") @@ -256,7 +325,7 @@ func readUUID(r byteReader) (*UUID, error) { return &uuid, nil } -func readString(r byteReader, header *KafkaRequestHeader, nullable bool) (string, error) { +func readString(r *largebuf.LargeBufferReader, header KafkaRequestHeader, nullable bool) (string, error) { size, err := readStringLength(r, header, nullable) if err != nil { return "", err @@ -288,7 +357,7 @@ func validateKafkaString(pkt []byte, size int) bool { return true } -func readStringLength(r byteReader, header *KafkaRequestHeader, nullable bool) (int, error) { +func readStringLength(r *largebuf.LargeBufferReader, header KafkaRequestHeader, nullable bool) (int, error) { if !isFlexible(header) { // length is stored as a fixed size int16 if r.Remaining() < Int16Len { @@ -326,7 +395,7 @@ func readStringLength(r byteReader, header *KafkaRequestHeader, nullable bool) ( return size, nil } -func readUnsignedVarint(r byteReader) (int, error) { +func readUnsignedVarint(r *largebuf.LargeBufferReader) (int, error) { value := 0 i := 0 for { @@ -349,7 +418,7 @@ func readUnsignedVarint(r byteReader) (int, error) { } } -func readInt32(r byteReader) (int, error) { +func readInt32(r *largebuf.LargeBufferReader) (int, error) { b, err := r.ReadN(Int32Len) if err != nil { return 0, errors.New("data too short for int32") @@ -357,7 +426,7 @@ func readInt32(r byteReader) (int, error) { return int(binary.BigEndian.Uint32(b)), nil } -func readInt64(r byteReader) (int64, error) { +func readInt64(r *largebuf.LargeBufferReader) (int64, error) { b, err := r.ReadN(Int64Len) if err != nil { return 0, errors.New("data too short for int64") diff --git a/pkg/internal/ebpf/kafkaparser/common_test.go b/pkg/internal/ebpf/kafkaparser/common_test.go index 5486f245d6..f84977c47e 100644 --- a/pkg/internal/ebpf/kafkaparser/common_test.go +++ b/pkg/internal/ebpf/kafkaparser/common_test.go @@ -10,15 +10,21 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "go.opentelemetry.io/obi/pkg/internal/largebuf" ) -func TestParseKafkaRequestHeader(t *testing.T) { +func TestNewKafkaRequestHeader(t *testing.T) { tests := []struct { - name string - packet []byte - expectErr bool - flexible bool - expected *KafkaRequestHeader + name string + packet []byte + expectErr bool + flexible bool + expectedMessageSize int32 + expectedAPIKey KafkaAPIKey + expectedAPIVersion int16 + expectedCorrelationID int32 + expectedClientID string }{ { name: "valid fetch request header v1", @@ -32,14 +38,12 @@ func TestParseKafkaRequestHeader(t *testing.T) { copy(pkt[14:18], "test") // ClientID return pkt }(), - expectErr: false, - expected: &KafkaRequestHeader{ - MessageSize: 100, - APIKey: 1, - APIVersion: 1, - CorrelationID: 12345, - ClientID: "test", - }, + expectErr: false, + expectedMessageSize: 100, + expectedAPIKey: 1, + expectedAPIVersion: 1, + expectedCorrelationID: 12345, + expectedClientID: "test", }, { name: "valid produce request header v9 (flexible)", @@ -54,35 +58,33 @@ func TestParseKafkaRequestHeader(t *testing.T) { pkt[20] = 0 // 0 tagged_fields return pkt }(), - expectErr: false, - flexible: true, - expected: &KafkaRequestHeader{ - MessageSize: 150, - APIKey: 0, - APIVersion: 9, - CorrelationID: 54321, - ClientID: "client", - }, + expectErr: false, + flexible: true, + expectedMessageSize: 150, + expectedAPIKey: 0, + expectedAPIVersion: 9, + expectedCorrelationID: 54321, + expectedClientID: "client", }, { name: "valid metadata request header v10", packet: func() []byte { - pkt := make([]byte, 14) + pkt := make([]byte, 15) // MinKafkaRequestLen + 1 for tagged fields byte binary.BigEndian.PutUint32(pkt[0:4], 100) // MessageSize binary.BigEndian.PutUint16(pkt[4:6], 3) // APIKey (Metadata) binary.BigEndian.PutUint16(pkt[6:8], 10) // APIVersion binary.BigEndian.PutUint32(pkt[8:12], 98765) // CorrelationID binary.BigEndian.PutUint16(pkt[12:14], 0) // ClientID length (empty) + pkt[14] = 0 // tagged fields = 0 return pkt }(), - expectErr: false, - expected: &KafkaRequestHeader{ - MessageSize: 100, - APIKey: 3, - APIVersion: 10, - CorrelationID: 98765, - ClientID: "", - }, + expectErr: false, + flexible: true, + expectedMessageSize: 100, + expectedAPIKey: 3, + expectedAPIVersion: 10, + expectedCorrelationID: 98765, + expectedClientID: "", }, { name: "packet too short", @@ -90,7 +92,6 @@ func TestParseKafkaRequestHeader(t *testing.T) { return make([]byte, 10) // Less than MinKafkaRequestLen }(), expectErr: true, - expected: nil, }, { name: "invalid API key", @@ -104,7 +105,6 @@ func TestParseKafkaRequestHeader(t *testing.T) { return pkt }(), expectErr: true, - expected: nil, }, { name: "unsupported fetch version", @@ -118,7 +118,6 @@ func TestParseKafkaRequestHeader(t *testing.T) { return pkt }(), expectErr: true, - expected: nil, }, { name: "negative client ID size", @@ -132,7 +131,6 @@ func TestParseKafkaRequestHeader(t *testing.T) { return pkt }(), expectErr: true, - expected: nil, }, { name: "packet too short for client ID", @@ -146,14 +144,12 @@ func TestParseKafkaRequestHeader(t *testing.T) { return pkt }(), expectErr: true, - expected: nil, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - r := newBytesReader(tt.packet) - header, err := ParseKafkaRequestHeader(r) + header, err := NewKafkaRequestHeader(largebuf.NewLargeBufferFrom(tt.packet)) if tt.expectErr { assert.Error(t, err) @@ -161,124 +157,107 @@ func TestParseKafkaRequestHeader(t *testing.T) { } require.NoError(t, err) - require.NotNil(t, header) - assert.Equal(t, tt.expected.MessageSize, header.MessageSize) - assert.Equal(t, tt.expected.APIKey, header.APIKey) - assert.Equal(t, tt.expected.APIVersion, header.APIVersion) - assert.Equal(t, tt.expected.CorrelationID, header.CorrelationID) - assert.Equal(t, tt.expected.ClientID, header.ClientID) + assert.Equal(t, tt.expectedMessageSize, header.MessageSize()) + assert.Equal(t, tt.expectedAPIKey, header.APIKey()) + assert.Equal(t, tt.expectedAPIVersion, header.APIVersion()) + assert.Equal(t, tt.expectedCorrelationID, header.CorrelationID()) + assert.Equal(t, tt.expectedClientID, header.ClientID()) - expectedConsumed := MinKafkaRequestLen + len(tt.expected.ClientID) + expectedBodyOffset := MinKafkaRequestLen + len(tt.expectedClientID) if tt.flexible { - expectedConsumed++ // Account for tagged fields byte + expectedBodyOffset++ // Account for tagged fields byte } - assert.Equal(t, expectedConsumed, r.Pos()) + assert.Equal(t, expectedBodyOffset, int(header.bodyOffset)) }) } } func TestValidateKafkaHeader(t *testing.T) { tests := []struct { - name string - header *KafkaRequestHeader - expectErr bool + name string + msgSize int32 + apiKey KafkaAPIKey + apiVersion int16 + correlationID int32 + expectErr bool }{ { - name: "valid fetch header", - header: &KafkaRequestHeader{ - MessageSize: 100, - APIKey: APIKeyFetch, - APIVersion: 5, - CorrelationID: 123, - }, - expectErr: false, - }, - { - name: "valid produce header", - header: &KafkaRequestHeader{ - MessageSize: 200, - APIKey: APIKeyProduce, - APIVersion: 8, - CorrelationID: 456, - }, - expectErr: false, - }, - { - name: "valid metadata header", - header: &KafkaRequestHeader{ - MessageSize: 150, - APIKey: APIKeyMetadata, - APIVersion: 12, - CorrelationID: 789, - }, - expectErr: false, - }, - { - name: "message size too small", - header: &KafkaRequestHeader{ - MessageSize: 5, - APIKey: APIKeyFetch, - APIVersion: 1, - CorrelationID: 123, - }, - expectErr: true, + name: "valid fetch header", + msgSize: 100, + apiKey: APIKeyFetch, + apiVersion: 5, + correlationID: 123, }, { - name: "message size too large", - header: &KafkaRequestHeader{ - MessageSize: KafkaMaxPayloadLen + 1, - APIKey: APIKeyFetch, - APIVersion: 1, - CorrelationID: 123, - }, - expectErr: true, + name: "valid produce header", + msgSize: 200, + apiKey: APIKeyProduce, + apiVersion: 8, + correlationID: 456, }, { - name: "negative API version", - header: &KafkaRequestHeader{ - MessageSize: 100, - APIKey: APIKeyFetch, - APIVersion: -1, - CorrelationID: 123, - }, - expectErr: true, + name: "valid metadata header", + msgSize: 150, + apiKey: APIKeyMetadata, + apiVersion: 12, + correlationID: 789, }, { - name: "negative correlation ID", - header: &KafkaRequestHeader{ - MessageSize: 100, - APIKey: APIKeyFetch, - APIVersion: 1, - CorrelationID: -1, - }, - expectErr: true, + name: "message size too small", + msgSize: 5, + apiKey: APIKeyFetch, + apiVersion: 1, + correlationID: 123, + expectErr: true, }, { - name: "unsupported metadata version (too low)", - header: &KafkaRequestHeader{ - MessageSize: 100, - APIKey: APIKeyMetadata, - APIVersion: 9, - CorrelationID: 123, - }, - expectErr: true, + name: "message size too large", + msgSize: KafkaMaxPayloadLen + 1, + apiKey: APIKeyFetch, + apiVersion: 1, + correlationID: 123, + expectErr: true, }, { - name: "unsupported metadata version (too high)", - header: &KafkaRequestHeader{ - MessageSize: 100, - APIKey: APIKeyMetadata, - APIVersion: 14, - CorrelationID: 123, - }, - expectErr: true, + name: "negative API version", + msgSize: 100, + apiKey: APIKeyFetch, + apiVersion: -1, + correlationID: 123, + expectErr: true, + }, + { + name: "negative correlation ID", + msgSize: 100, + apiKey: APIKeyFetch, + apiVersion: 1, + correlationID: -1, + expectErr: true, + }, + { + name: "unsupported metadata version (too low)", + msgSize: 100, + apiKey: APIKeyMetadata, + apiVersion: 9, + correlationID: 123, + expectErr: true, + }, + { + name: "unsupported metadata version (too high)", + msgSize: 100, + apiKey: APIKeyMetadata, + apiVersion: 14, + correlationID: 123, + expectErr: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := validateKafkaRequestHeader(tt.header) + lb := buildRawHeaderBuf(tt.msgSize, tt.apiKey, tt.apiVersion, tt.correlationID) + h := KafkaRequestHeader{lb: lb} + err := h.validate() if tt.expectErr { assert.Error(t, err) } else { @@ -291,65 +270,16 @@ func TestValidateKafkaHeader(t *testing.T) { func TestIsFlexible(t *testing.T) { tests := []struct { name string - header *KafkaRequestHeader + header KafkaRequestHeader expected bool }{ - { - name: "produce v8 - not flexible", - header: &KafkaRequestHeader{ - APIKey: APIKeyProduce, - APIVersion: 8, - }, - expected: false, - }, - { - name: "produce v9 - flexible", - header: &KafkaRequestHeader{ - APIKey: APIKeyProduce, - APIVersion: 9, - }, - expected: true, - }, - { - name: "fetch v11 - not flexible", - header: &KafkaRequestHeader{ - APIKey: APIKeyFetch, - APIVersion: 11, - }, - expected: false, - }, - { - name: "fetch v12 - flexible", - header: &KafkaRequestHeader{ - APIKey: APIKeyFetch, - APIVersion: 12, - }, - expected: true, - }, - { - name: "metadata v8 - not flexible", - header: &KafkaRequestHeader{ - APIKey: APIKeyMetadata, - APIVersion: 8, - }, - expected: false, - }, - { - name: "metadata v9 - flexible", - header: &KafkaRequestHeader{ - APIKey: APIKeyMetadata, - APIVersion: 9, - }, - expected: true, - }, - { - name: "unknown API key", - header: &KafkaRequestHeader{ - APIKey: 99, - APIVersion: 1, - }, - expected: false, - }, + {name: "produce v8 - not flexible", header: newUncheckedHeader(APIKeyProduce, 8), expected: false}, + {name: "produce v9 - flexible", header: newUncheckedHeader(APIKeyProduce, 9), expected: true}, + {name: "fetch v11 - not flexible", header: newUncheckedHeader(APIKeyFetch, 11), expected: false}, + {name: "fetch v12 - flexible", header: newUncheckedHeader(APIKeyFetch, 12), expected: true}, + {name: "metadata v8 - not flexible", header: newUncheckedHeader(APIKeyMetadata, 8), expected: false}, + {name: "metadata v9 - flexible", header: newUncheckedHeader(APIKeyMetadata, 9), expected: true}, + {name: "unknown API key", header: newUncheckedHeader(99, 1), expected: false}, } for _, tt := range tests { @@ -364,7 +294,7 @@ func TestReadArrayLength(t *testing.T) { tests := []struct { name string packet []byte - header *KafkaRequestHeader + header KafkaRequestHeader offset int expectedLength int expectedOffset int @@ -377,10 +307,7 @@ func TestReadArrayLength(t *testing.T) { binary.BigEndian.PutUint32(pkt[0:4], 5) // Array length return pkt }(), - header: &KafkaRequestHeader{ - APIKey: APIKeyFetch, - APIVersion: 5, // Non-flexible - }, + header: newUncheckedHeader(APIKeyFetch, 5), offset: 0, expectedLength: 5, expectedOffset: 4, @@ -392,10 +319,7 @@ func TestReadArrayLength(t *testing.T) { // Varint encoding of 6 (5+1 for flexible arrays) return []byte{0x06, 0x00, 0x00, 0x00} }(), - header: &KafkaRequestHeader{ - APIKey: APIKeyFetch, - APIVersion: 12, // Flexible - }, + header: newUncheckedHeader(APIKeyFetch, 12), offset: 0, expectedLength: 5, expectedOffset: 1, @@ -405,8 +329,8 @@ func TestReadArrayLength(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - r := newBytesReader(tt.packet[tt.offset:]) - length, err := readArrayLength(r, tt.header) + r := largebuf.NewLargeBufferFrom(tt.packet[tt.offset:]).NewReader() + length, err := readArrayLength(&r, tt.header) if tt.expectErr { assert.Error(t, err) @@ -415,7 +339,7 @@ func TestReadArrayLength(t *testing.T) { require.NoError(t, err) assert.Equal(t, tt.expectedLength, length) - assert.Equal(t, tt.expectedOffset-tt.offset, r.Pos()) + assert.Equal(t, tt.expectedOffset-tt.offset, r.ReadOffset()) }) } } @@ -483,8 +407,8 @@ func TestReadUUID(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - r := newBytesReader(tt.packet[tt.offset:]) - uuid, err := readUUID(r) + r := largebuf.NewLargeBufferFrom(tt.packet[tt.offset:]).NewReader() + uuid, err := readUUID(&r) if tt.expectErr { assert.Error(t, err) @@ -494,7 +418,7 @@ func TestReadUUID(t *testing.T) { require.NoError(t, err) require.NotNil(t, uuid) assert.Equal(t, tt.expectedUUID, *uuid) - assert.Equal(t, tt.expectedOffset-tt.offset, r.Pos()) + assert.Equal(t, tt.expectedOffset-tt.offset, r.ReadOffset()) }) } } @@ -503,7 +427,7 @@ func TestReadString(t *testing.T) { tests := []struct { name string packet []byte - header *KafkaRequestHeader + header KafkaRequestHeader offset int nullable bool expectedString string @@ -518,10 +442,7 @@ func TestReadString(t *testing.T) { copy(pkt[2:7], "hello") return pkt }(), - header: &KafkaRequestHeader{ - APIKey: APIKeyFetch, - APIVersion: 5, // Non-flexible - }, + header: newUncheckedHeader(APIKeyFetch, 5), offset: 0, nullable: false, expectedString: "hello", @@ -536,10 +457,7 @@ func TestReadString(t *testing.T) { pkt = append(pkt, []byte("world")...) return pkt }(), - header: &KafkaRequestHeader{ - APIKey: APIKeyFetch, - APIVersion: 12, // Flexible - }, + header: newUncheckedHeader(APIKeyFetch, 12), offset: 0, nullable: false, expectedString: "world", @@ -553,10 +471,7 @@ func TestReadString(t *testing.T) { binary.BigEndian.PutUint16(pkt[0:2], 10) // String length > available return pkt }(), - header: &KafkaRequestHeader{ - APIKey: APIKeyFetch, - APIVersion: 5, - }, + header: newUncheckedHeader(APIKeyFetch, 5), offset: 0, nullable: false, expectErr: true, @@ -565,8 +480,8 @@ func TestReadString(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - r := newBytesReader(tt.packet[tt.offset:]) - str, err := readString(r, tt.header, tt.nullable) + r := largebuf.NewLargeBufferFrom(tt.packet[tt.offset:]).NewReader() + str, err := readString(&r, tt.header, tt.nullable) if tt.expectErr { assert.Error(t, err) @@ -575,7 +490,7 @@ func TestReadString(t *testing.T) { require.NoError(t, err) assert.Equal(t, tt.expectedString, str) - assert.Equal(t, tt.expectedOffset-tt.offset, r.Pos()) + assert.Equal(t, tt.expectedOffset-tt.offset, r.ReadOffset()) }) } } @@ -629,8 +544,8 @@ func TestReadUnsignedVarint(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - r := newBytesReader(tt.data[tt.offset:]) - value, err := readUnsignedVarint(r) + r := largebuf.NewLargeBufferFrom(tt.data[tt.offset:]).NewReader() + value, err := readUnsignedVarint(&r) if tt.expectErr { assert.Error(t, err) @@ -639,7 +554,7 @@ func TestReadUnsignedVarint(t *testing.T) { require.NoError(t, err) assert.Equal(t, tt.expectedValue, value) - assert.Equal(t, tt.expectedBytes, r.Pos()) + assert.Equal(t, tt.expectedBytes, r.ReadOffset()) }) } } @@ -680,7 +595,7 @@ func TestSkip(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - r := newBytesReader(tt.packet[tt.offset:]) + r := largebuf.NewLargeBufferFrom(tt.packet[tt.offset:]).NewReader() err := r.Skip(tt.length) if tt.expectErr { @@ -689,13 +604,13 @@ func TestSkip(t *testing.T) { } require.NoError(t, err) - assert.Equal(t, tt.expectedBytes, r.Pos()) + assert.Equal(t, tt.expectedBytes, r.ReadOffset()) }) } } // Truncation tests to simulate incomplete packets -func TestParseKafkaRequestHeaderTruncation(t *testing.T) { +func TestNewKafkaRequestHeaderTruncation(t *testing.T) { // Create a valid header first validPacket := make([]byte, 18) binary.BigEndian.PutUint32(validPacket[0:4], 100) // MessageSize @@ -709,7 +624,7 @@ func TestParseKafkaRequestHeaderTruncation(t *testing.T) { for i := 1; i < len(validPacket); i++ { t.Run(fmt.Sprintf("truncated_at_%d", i), func(t *testing.T) { truncated := validPacket[:i] - _, err := ParseKafkaRequestHeader(newBytesReader(truncated)) + _, err := NewKafkaRequestHeader(largebuf.NewLargeBufferFrom(truncated)) assert.Error(t, err, "expected error for truncated packet at position %d", i) }) } diff --git a/pkg/internal/ebpf/kafkaparser/fetch.go b/pkg/internal/ebpf/kafkaparser/fetch.go index 7815998a53..429d3b5ef5 100644 --- a/pkg/internal/ebpf/kafkaparser/fetch.go +++ b/pkg/internal/ebpf/kafkaparser/fetch.go @@ -3,7 +3,11 @@ package kafkaparser // import "go.opentelemetry.io/obi/pkg/internal/ebpf/kafkaparser" -import "errors" +import ( + "errors" + + "go.opentelemetry.io/obi/pkg/internal/largebuf" +) type FetchPartition struct { Partition int @@ -19,7 +23,7 @@ type FetchRequest struct { Topics []*FetchTopic } -func ParseFetchRequest(r byteReader, header *KafkaRequestHeader) (*FetchRequest, error) { +func ParseFetchRequest(r *largebuf.LargeBufferReader, header KafkaRequestHeader) (*FetchRequest, error) { if err := fetchRequestSkipUntilTopics(r, header); err != nil { return nil, err } @@ -38,10 +42,10 @@ func ParseFetchRequest(r byteReader, header *KafkaRequestHeader) (*FetchRequest, }, nil } -func fetchRequestSkipUntilTopics(r byteReader, header *KafkaRequestHeader) error { +func fetchRequestSkipUntilTopics(r *largebuf.LargeBufferReader, header KafkaRequestHeader) error { var skipLen int switch { - case header.APIVersion >= 15: + case header.APIVersion() >= 15: /* Fetch Request (Version: 15-17) => max_wait_ms min_bytes max_bytes isolation_level session_id session_epoch ... max_wait_ms => INT32 @@ -58,7 +62,7 @@ func fetchRequestSkipUntilTopics(r byteReader, header *KafkaRequestHeader) error Int8Len + // isolation_level Int32Len + // session_id Int32Len // session_epoch - case header.APIVersion >= 7: + case header.APIVersion() >= 7: /* Fetch Request (Version: 7-14) => replica_id max_wait_ms min_bytes max_bytes isolation_level session_id session_epoch ... replica_id => INT32 @@ -78,7 +82,7 @@ func fetchRequestSkipUntilTopics(r byteReader, header *KafkaRequestHeader) error Int32Len + // session_id Int32Len // session_epoch - case header.APIVersion >= 4: + case header.APIVersion() >= 4: /* Fetch Request (Version: 4-6) => replica_id max_wait_ms min_bytes max_bytes isolation_level [Topics] replica_id => INT32 @@ -100,7 +104,7 @@ func fetchRequestSkipUntilTopics(r byteReader, header *KafkaRequestHeader) error return nil } -func parseFetchTopics(r byteReader, header *KafkaRequestHeader) ([]*FetchTopic, error) { +func parseFetchTopics(r *largebuf.LargeBufferReader, header KafkaRequestHeader) ([]*FetchTopic, error) { topicsLen, err := readArrayLength(r, header) if err != nil { return nil, err @@ -119,10 +123,10 @@ func parseFetchTopics(r byteReader, header *KafkaRequestHeader) ([]*FetchTopic, return topics, nil } -func parseFetchTopic(r byteReader, header *KafkaRequestHeader) (*FetchTopic, error) { +func parseFetchTopic(r *largebuf.LargeBufferReader, header KafkaRequestHeader) (*FetchTopic, error) { var topic FetchTopic var err error - if header.APIVersion >= 13 { + if header.APIVersion() >= 13 { /* since kafka fetch request version 13, Topics are identified by a UUID and not by Name. correlation between topic Name and UUID is done by the metadata response. @@ -174,7 +178,7 @@ func parseFetchTopic(r byteReader, header *KafkaRequestHeader) (*FetchTopic, err return &topic, nil } -func parseFetchPartition(r byteReader, header *KafkaRequestHeader) (*FetchPartition, error) { +func parseFetchPartition(r *largebuf.LargeBufferReader, header KafkaRequestHeader) (*FetchPartition, error) { /* partitions => Partition fetch_offset log_start_offset partition_max_bytes Partition => INT32 @@ -184,7 +188,7 @@ func parseFetchPartition(r byteReader, header *KafkaRequestHeader) (*FetchPartit if err != nil { return nil, err } - if header.APIVersion >= 9 { + if header.APIVersion() >= 9 { /* fetch request version 9 and above include current_leader_epoch between Partition and fetch_offset. partitions => Partition current_leader_epoch fetch_offset last_fetched_epoch log_start_offset partition_max_bytes _tagged_fields @@ -206,10 +210,10 @@ func parseFetchPartition(r byteReader, header *KafkaRequestHeader) (*FetchPartit }, nil } -func skipFetchPartitions(r byteReader, header *KafkaRequestHeader, partitionCount int) error { +func skipFetchPartitions(r *largebuf.LargeBufferReader, header KafkaRequestHeader, partitionCount int) error { var fetchPartitionLen int switch { - case header.APIVersion >= 12: + case header.APIVersion() >= 12: /* partitions => Partition current_leader_epoch fetch_offset last_fetched_epoch log_start_offset partition_max_bytes _tagged_fields Partition => INT32 @@ -225,7 +229,7 @@ func skipFetchPartitions(r byteReader, header *KafkaRequestHeader, partitionCoun Int32Len + // last_fetched_epoch Int64Len + // log_start_offset Int32Len // partition_max_bytes - case header.APIVersion >= 9: + case header.APIVersion() >= 9: /* partitions => Partition current_leader_epoch fetch_offset log_start_offset partition_max_bytes Partition => INT32 @@ -239,7 +243,7 @@ func skipFetchPartitions(r byteReader, header *KafkaRequestHeader, partitionCoun Int64Len + // fetch_offset Int64Len + // log_start_offset Int32Len // partition_max_bytes - case header.APIVersion >= 5: + case header.APIVersion() >= 5: /* partitions => Partition fetch_offset log_start_offset partition_max_bytes Partition => INT32 diff --git a/pkg/internal/ebpf/kafkaparser/fetch_test.go b/pkg/internal/ebpf/kafkaparser/fetch_test.go index dc943a129a..46fd4e5820 100644 --- a/pkg/internal/ebpf/kafkaparser/fetch_test.go +++ b/pkg/internal/ebpf/kafkaparser/fetch_test.go @@ -10,13 +10,15 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "go.opentelemetry.io/obi/pkg/internal/largebuf" ) func TestParseFetchRequest(t *testing.T) { tests := []struct { name string packet []byte - header *KafkaRequestHeader + header KafkaRequestHeader expectErr bool expectedTopicCount int expectedTopicName string @@ -52,10 +54,7 @@ func TestParseFetchRequest(t *testing.T) { return pkt[:offset] }(), - header: &KafkaRequestHeader{ - APIKey: APIKeyFetch, - APIVersion: 4, - }, + header: newTestHeader(APIKeyFetch, 4), expectErr: false, expectedTopicCount: 1, expectedTopicName: "my-topic", @@ -96,10 +95,7 @@ func TestParseFetchRequest(t *testing.T) { return pkt[:offset] }(), - header: &KafkaRequestHeader{ - APIKey: APIKeyFetch, - APIVersion: 13, - }, + header: newTestHeader(APIKeyFetch, 13), expectErr: false, expectedTopicCount: 1, expectedTopicUUID: &UUID{ @@ -141,10 +137,7 @@ func TestParseFetchRequest(t *testing.T) { return pkt[:offset] }(), - header: &KafkaRequestHeader{ - APIKey: APIKeyFetch, - APIVersion: 15, - }, + header: newTestHeader(APIKeyFetch, 15), expectErr: false, expectedTopicCount: 1, expectedTopicUUID: &UUID{ @@ -186,10 +179,7 @@ func TestParseFetchRequest(t *testing.T) { return pkt[:offset] }(), - header: &KafkaRequestHeader{ - APIKey: APIKeyFetch, - APIVersion: 12, - }, + header: newTestHeader(APIKeyFetch, 12), expectErr: false, expectedTopicCount: 1, expectedTopicName: "my-topic", @@ -228,10 +218,7 @@ func TestParseFetchRequest(t *testing.T) { return pkt[:offset] }(), - header: &KafkaRequestHeader{ - APIKey: APIKeyFetch, - APIVersion: 17, - }, + header: newTestHeader(APIKeyFetch, 17), expectErr: false, expectedTopicCount: 1, expectedTopicUUID: &UUID{ @@ -276,10 +263,7 @@ func TestParseFetchRequest(t *testing.T) { return pkt[:offset] }(), - header: &KafkaRequestHeader{ - APIKey: APIKeyFetch, - APIVersion: 5, - }, + header: newTestHeader(APIKeyFetch, 5), expectErr: false, expectedTopicCount: 2, expectedTopicName: "topic1", // We'll check the first topic @@ -308,19 +292,13 @@ func TestParseFetchRequest(t *testing.T) { return pkt[:offset] }(), - header: &KafkaRequestHeader{ - APIKey: APIKeyFetch, - APIVersion: 4, - }, + header: newTestHeader(APIKeyFetch, 4), expectErr: true, // Should error on no Topics }, { - name: "fetch request packet too short for skip", - packet: []byte{0x01, 0x02}, // Too short - header: &KafkaRequestHeader{ - APIKey: APIKeyFetch, - APIVersion: 4, - }, + name: "fetch request packet too short for skip", + packet: []byte{0x01, 0x02}, // Too short + header: newTestHeader(APIKeyFetch, 4), expectErr: true, }, { @@ -344,17 +322,15 @@ func TestParseFetchRequest(t *testing.T) { // No space for Topics array length return pkt[:offset] }(), - header: &KafkaRequestHeader{ - APIKey: APIKeyFetch, - APIVersion: 4, - }, + header: newTestHeader(APIKeyFetch, 4), expectErr: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - req, err := ParseFetchRequest(newBytesReader(tt.packet), tt.header) + r := largebuf.NewLargeBufferFrom(tt.packet).NewReader() + req, err := ParseFetchRequest(&r, tt.header) if tt.expectErr { assert.Error(t, err) @@ -386,10 +362,7 @@ func TestParseFetchRequestTruncation(t *testing.T) { for _, version := range versions { t.Run(fmt.Sprintf("version_%d_truncation", version), func(t *testing.T) { - header := &KafkaRequestHeader{ - APIKey: APIKeyFetch, - APIVersion: version, - } + header := newTestHeader(APIKeyFetch, version) // Create a valid packet for this version validPacket := createValidFetchPacket(version) @@ -398,7 +371,8 @@ func TestParseFetchRequestTruncation(t *testing.T) { for i := 1; i < len(validPacket); i++ { t.Run(fmt.Sprintf("truncated_at_%d", i), func(t *testing.T) { truncated := validPacket[:i] - _, err := ParseFetchRequest(newBytesReader(truncated), header) + r := largebuf.NewLargeBufferFrom(truncated).NewReader() + _, err := ParseFetchRequest(&r, header) assert.Error(t, err, "expected error for truncated packet at position %d for version %d", i, version) }) } diff --git a/pkg/internal/ebpf/kafkaparser/metadata.go b/pkg/internal/ebpf/kafkaparser/metadata.go index a8dcecd15f..e26a1714f2 100644 --- a/pkg/internal/ebpf/kafkaparser/metadata.go +++ b/pkg/internal/ebpf/kafkaparser/metadata.go @@ -5,6 +5,8 @@ package kafkaparser // import "go.opentelemetry.io/obi/pkg/internal/ebpf/kafkapa import ( "errors" + + "go.opentelemetry.io/obi/pkg/internal/largebuf" ) const partitionLen = // 26 @@ -25,7 +27,7 @@ type MetadataResponse struct { Topics []*MetadataTopic } -func ParseMetadataResponse(r byteReader, header *KafkaRequestHeader) (*MetadataResponse, error) { +func ParseMetadataResponse(r *largebuf.LargeBufferReader, header KafkaRequestHeader) (*MetadataResponse, error) { if err := metadataResponseSkipUntilTopics(r, header); err != nil { return nil, err } @@ -41,7 +43,7 @@ func ParseMetadataResponse(r byteReader, header *KafkaRequestHeader) (*MetadataR }, nil } -func metadataResponseSkipUntilTopics(r byteReader, header *KafkaRequestHeader) error { +func metadataResponseSkipUntilTopics(r *largebuf.LargeBufferReader, header KafkaRequestHeader) error { if err := r.Skip(Int32Len); err != nil { // throttle_time_ms return err } @@ -56,7 +58,7 @@ func metadataResponseSkipUntilTopics(r byteReader, header *KafkaRequestHeader) e return r.Skip(clusterIDLen + Int32Len) // cluster_id + controller_id } -func skipMetadataResponseBrokers(r byteReader, header *KafkaRequestHeader) error { +func skipMetadataResponseBrokers(r *largebuf.LargeBufferReader, header KafkaRequestHeader) error { brokersLen, err := readArrayLength(r, header) if err != nil { return err @@ -88,7 +90,7 @@ func skipMetadataResponseBrokers(r byteReader, header *KafkaRequestHeader) error return nil } -func parseMetadataTopics(r byteReader, header *KafkaRequestHeader) ([]*MetadataTopic, error) { +func parseMetadataTopics(r *largebuf.LargeBufferReader, header KafkaRequestHeader) ([]*MetadataTopic, error) { topicsLen, err := readArrayLength(r, header) if err != nil { return nil, err @@ -107,7 +109,7 @@ func parseMetadataTopics(r byteReader, header *KafkaRequestHeader) ([]*MetadataT return topics, nil } -func parseMetadataTopic(r byteReader, header *KafkaRequestHeader, isLast bool) (*MetadataTopic, error) { +func parseMetadataTopic(r *largebuf.LargeBufferReader, header KafkaRequestHeader, isLast bool) (*MetadataTopic, error) { var topic MetadataTopic /* Metadata Response (Version: 10, 11, 12 and 13) @@ -129,7 +131,7 @@ func parseMetadataTopic(r byteReader, header *KafkaRequestHeader, isLast bool) ( if err := r.Skip(Int16Len); err != nil { // error_code return nil, err } - isNullable := header.APIVersion >= 12 + isNullable := header.APIVersion() >= 12 topicName, err := readString(r, header, isNullable) if err != nil { return nil, err diff --git a/pkg/internal/ebpf/kafkaparser/metadata_test.go b/pkg/internal/ebpf/kafkaparser/metadata_test.go index 466661ed60..71948e5fd8 100644 --- a/pkg/internal/ebpf/kafkaparser/metadata_test.go +++ b/pkg/internal/ebpf/kafkaparser/metadata_test.go @@ -10,13 +10,15 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "go.opentelemetry.io/obi/pkg/internal/largebuf" ) func TestParseMetadataResponse(t *testing.T) { tests := []struct { name string packet []byte - header *KafkaRequestHeader + header KafkaRequestHeader expectErr bool expectedTopicCount int expectedTopicName string @@ -88,10 +90,7 @@ func TestParseMetadataResponse(t *testing.T) { return pkt[:offset] }(), - header: &KafkaRequestHeader{ - APIKey: APIKeyMetadata, - APIVersion: 12, - }, + header: newTestHeader(APIKeyMetadata, 12), expectErr: false, expectedTopicCount: 1, expectedTopicName: "topic-test", @@ -165,10 +164,7 @@ func TestParseMetadataResponse(t *testing.T) { return pkt[:offset] }(), - header: &KafkaRequestHeader{ - APIKey: APIKeyMetadata, - APIVersion: 12, - }, + header: newTestHeader(APIKeyMetadata, 12), expectErr: false, expectedTopicCount: 1, expectedTopicName: "", // null Name in v12+ @@ -279,10 +275,7 @@ func TestParseMetadataResponse(t *testing.T) { return pkt[:offset] }(), - header: &KafkaRequestHeader{ - APIKey: APIKeyMetadata, - APIVersion: 13, - }, + header: newTestHeader(APIKeyMetadata, 13), expectErr: false, expectedTopicCount: 2, expectedTopicName: "topic1", // We'll check the first topic @@ -319,26 +312,21 @@ func TestParseMetadataResponse(t *testing.T) { return pkt[:offset] }(), - header: &KafkaRequestHeader{ - APIKey: APIKeyMetadata, - APIVersion: 10, - }, + header: newTestHeader(APIKeyMetadata, 10), expectErr: true, // Should error on no Topics }, { - name: "metadata response packet too short", - packet: []byte{0x01, 0x02}, // Too short - header: &KafkaRequestHeader{ - APIKey: APIKeyMetadata, - APIVersion: 10, - }, + name: "metadata response packet too short", + packet: []byte{0x01, 0x02}, // Too short + header: newTestHeader(APIKeyMetadata, 10), expectErr: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - resp, err := ParseMetadataResponse(newBytesReader(tt.packet), tt.header) + r := largebuf.NewLargeBufferFrom(tt.packet).NewReader() + resp, err := ParseMetadataResponse(&r, tt.header) if tt.expectErr { assert.Error(t, err) @@ -368,10 +356,7 @@ func TestParseMetadataResponseTruncation(t *testing.T) { for _, version := range versions { t.Run(fmt.Sprintf("version_%d_truncation", version), func(t *testing.T) { - header := &KafkaRequestHeader{ - APIKey: APIKeyMetadata, - APIVersion: version, - } + header := newTestHeader(APIKeyMetadata, version) // Create a valid packet for this version validPacket := createValidMetadataPacket(version) @@ -380,7 +365,8 @@ func TestParseMetadataResponseTruncation(t *testing.T) { for i := 1; i < len(validPacket); i++ { t.Run(fmt.Sprintf("truncated_at_%d", i), func(t *testing.T) { truncated := validPacket[:i] - _, err := ParseMetadataResponse(newBytesReader(truncated), header) + r := largebuf.NewLargeBufferFrom(truncated).NewReader() + _, err := ParseMetadataResponse(&r, header) assert.Error(t, err, "expected error for truncated packet at position %d for version %d", i, version) }) } @@ -394,15 +380,13 @@ func TestParseMetadataResponseAllVersions(t *testing.T) { for _, version := range versions { t.Run(fmt.Sprintf("version_%d", version), func(t *testing.T) { - header := &KafkaRequestHeader{ - APIKey: APIKeyMetadata, - APIVersion: version, - } + header := newTestHeader(APIKeyMetadata, version) // Create a valid packet for this version validPacket := createValidMetadataPacket(version) - resp, err := ParseMetadataResponse(newBytesReader(validPacket), header) + r := largebuf.NewLargeBufferFrom(validPacket).NewReader() + resp, err := ParseMetadataResponse(&r, header) require.NoError(t, err, "unexpected error for version %d", version) require.NotNil(t, resp) diff --git a/pkg/internal/ebpf/kafkaparser/produce.go b/pkg/internal/ebpf/kafkaparser/produce.go index dbc3dbb1a0..465b66e5f0 100644 --- a/pkg/internal/ebpf/kafkaparser/produce.go +++ b/pkg/internal/ebpf/kafkaparser/produce.go @@ -3,7 +3,11 @@ package kafkaparser // import "go.opentelemetry.io/obi/pkg/internal/ebpf/kafkaparser" -import "errors" +import ( + "errors" + + "go.opentelemetry.io/obi/pkg/internal/largebuf" +) type ProduceTopic struct { Name string @@ -14,7 +18,7 @@ type ProduceRequest struct { Topics []*ProduceTopic } -func ParseProduceRequest(r byteReader, header *KafkaRequestHeader) (*ProduceRequest, error) { +func ParseProduceRequest(r *largebuf.LargeBufferReader, header KafkaRequestHeader) (*ProduceRequest, error) { if err := produceRequestSkipUntilTopics(r, header); err != nil { return nil, err } @@ -30,7 +34,7 @@ func ParseProduceRequest(r byteReader, header *KafkaRequestHeader) (*ProduceRequ }, nil } -func produceRequestSkipUntilTopics(r byteReader, header *KafkaRequestHeader) error { +func produceRequestSkipUntilTopics(r *largebuf.LargeBufferReader, header KafkaRequestHeader) error { /* Produce Request (Version: 3-12) => transactional_id acks timeout_ms [topic_data] _tagged_fields transactional_id => NULLABLE_STRING / COMPACT_NULLABLE_STRING @@ -49,7 +53,7 @@ func produceRequestSkipUntilTopics(r byteReader, header *KafkaRequestHeader) err ) } -func parseProduceTopics(r byteReader, header *KafkaRequestHeader) ([]*ProduceTopic, error) { +func parseProduceTopics(r *largebuf.LargeBufferReader, header KafkaRequestHeader) ([]*ProduceTopic, error) { topicsLen, err := readArrayLength(r, header) if err != nil { return nil, err @@ -70,7 +74,7 @@ func parseProduceTopics(r byteReader, header *KafkaRequestHeader) ([]*ProduceTop return topics, nil } -func parseProduceTopic(r byteReader, header *KafkaRequestHeader) (*ProduceTopic, error) { +func parseProduceTopic(r *largebuf.LargeBufferReader, header KafkaRequestHeader) (*ProduceTopic, error) { var topic ProduceTopic /* Topics => topic [partitions] _tagged_fields diff --git a/pkg/internal/ebpf/kafkaparser/produce_test.go b/pkg/internal/ebpf/kafkaparser/produce_test.go index edcf98b0a6..f8453de836 100644 --- a/pkg/internal/ebpf/kafkaparser/produce_test.go +++ b/pkg/internal/ebpf/kafkaparser/produce_test.go @@ -10,6 +10,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "go.opentelemetry.io/obi/pkg/internal/largebuf" ) var negativeLength int16 = -1 @@ -18,7 +20,7 @@ func TestParseProduceRequest(t *testing.T) { tests := []struct { name string packet []byte - header *KafkaRequestHeader + header KafkaRequestHeader expectErr bool expectedTopicCount int expectedTopicName string @@ -54,10 +56,7 @@ func TestParseProduceRequest(t *testing.T) { return pkt[:offset] }(), - header: &KafkaRequestHeader{ - APIKey: APIKeyProduce, - APIVersion: 3, - }, + header: newTestHeader(APIKeyProduce, 3), expectErr: false, expectedTopicCount: 1, expectedTopicName: "my-topic", @@ -94,10 +93,7 @@ func TestParseProduceRequest(t *testing.T) { return pkt[:offset] }(), - header: &KafkaRequestHeader{ - APIKey: APIKeyProduce, - APIVersion: 8, - }, + header: newTestHeader(APIKeyProduce, 8), expectErr: false, expectedTopicCount: 1, expectedTopicName: "another-topic", @@ -132,10 +128,7 @@ func TestParseProduceRequest(t *testing.T) { return pkt[:offset] }(), - header: &KafkaRequestHeader{ - APIKey: APIKeyProduce, - APIVersion: 9, - }, + header: newTestHeader(APIKeyProduce, 9), expectErr: false, expectedTopicCount: 1, expectedTopicName: "my-topic", @@ -172,10 +165,7 @@ func TestParseProduceRequest(t *testing.T) { return pkt[:offset] }(), - header: &KafkaRequestHeader{ - APIKey: APIKeyProduce, - APIVersion: 12, - }, + header: newTestHeader(APIKeyProduce, 12), expectErr: false, expectedTopicCount: 1, expectedTopicName: "topic1", // We'll check the first topic @@ -217,10 +207,7 @@ func TestParseProduceRequest(t *testing.T) { offset += 4 return pkt[:offset] }(), - header: &KafkaRequestHeader{ - APIKey: APIKeyProduce, - APIVersion: 12, - }, + header: newTestHeader(APIKeyProduce, 12), expectErr: false, expectedTopicCount: 1, expectedTopicName: "topic1", // We'll check the first topic @@ -250,19 +237,13 @@ func TestParseProduceRequest(t *testing.T) { return pkt[:offset] }(), - header: &KafkaRequestHeader{ - APIKey: APIKeyProduce, - APIVersion: 3, - }, + header: newTestHeader(APIKeyProduce, 3), expectErr: true, // Should error on no Topics }, { - name: "produce request packet too short", - packet: []byte{0x01, 0x02}, // Too short - header: &KafkaRequestHeader{ - APIKey: APIKeyProduce, - APIVersion: 3, - }, + name: "produce request packet too short", + packet: []byte{0x01, 0x02}, // Too short + header: newTestHeader(APIKeyProduce, 3), expectErr: true, }, { @@ -273,17 +254,15 @@ func TestParseProduceRequest(t *testing.T) { binary.BigEndian.PutUint16(pkt[0:], 100) // length too large return pkt }(), - header: &KafkaRequestHeader{ - APIKey: APIKeyProduce, - APIVersion: 3, - }, + header: newTestHeader(APIKeyProduce, 3), expectErr: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - req, err := ParseProduceRequest(newBytesReader(tt.packet), tt.header) + r := largebuf.NewLargeBufferFrom(tt.packet).NewReader() + req, err := ParseProduceRequest(&r, tt.header) if tt.expectErr { assert.Error(t, err) @@ -312,7 +291,7 @@ func TestProduceRequestSkipUntilTopics(t *testing.T) { tests := []struct { name string packet []byte - header *KafkaRequestHeader + header KafkaRequestHeader expectedOffset int expectErr bool }{ @@ -335,9 +314,7 @@ func TestProduceRequestSkipUntilTopics(t *testing.T) { return pkt }(), - header: &KafkaRequestHeader{ - APIVersion: 5, - }, + header: newUncheckedHeader(APIKeyProduce, 5), expectedOffset: 8, expectErr: false, }, @@ -362,9 +339,7 @@ func TestProduceRequestSkipUntilTopics(t *testing.T) { return pkt }(), - header: &KafkaRequestHeader{ - APIVersion: 7, - }, + header: newUncheckedHeader(APIKeyProduce, 7), expectedOffset: 14, expectErr: false, }, @@ -387,9 +362,7 @@ func TestProduceRequestSkipUntilTopics(t *testing.T) { return pkt }(), - header: &KafkaRequestHeader{ - APIVersion: 9, - }, + header: newUncheckedHeader(APIKeyProduce, 9), expectedOffset: 7, expectErr: false, }, @@ -414,26 +387,20 @@ func TestProduceRequestSkipUntilTopics(t *testing.T) { return pkt }(), - header: &KafkaRequestHeader{ - APIVersion: 10, - }, + header: newUncheckedHeader(APIKeyProduce, 10), expectedOffset: 13, expectErr: false, }, { - name: "packet too short for non-flexible", - packet: make([]byte, 5), - header: &KafkaRequestHeader{ - APIVersion: 5, - }, + name: "packet too short for non-flexible", + packet: make([]byte, 5), + header: newUncheckedHeader(APIKeyProduce, 5), expectErr: true, }, { - name: "packet too short for flexible", - packet: make([]byte, 3), - header: &KafkaRequestHeader{ - APIVersion: 9, - }, + name: "packet too short for flexible", + packet: make([]byte, 3), + header: newUncheckedHeader(APIKeyProduce, 9), expectErr: true, }, { @@ -443,17 +410,15 @@ func TestProduceRequestSkipUntilTopics(t *testing.T) { binary.BigEndian.PutUint16(pkt[0:], 100) // length too large return pkt }(), - header: &KafkaRequestHeader{ - APIVersion: 5, - }, + header: newUncheckedHeader(APIKeyProduce, 5), expectErr: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - r := newBytesReader(tt.packet) - err := produceRequestSkipUntilTopics(r, tt.header) + r := largebuf.NewLargeBufferFrom(tt.packet).NewReader() + err := produceRequestSkipUntilTopics(&r, tt.header) if tt.expectErr { assert.Error(t, err) @@ -461,7 +426,7 @@ func TestProduceRequestSkipUntilTopics(t *testing.T) { } require.NoError(t, err) - assert.Equal(t, tt.expectedOffset, r.Pos()) + assert.Equal(t, tt.expectedOffset, r.ReadOffset()) }) } } @@ -473,10 +438,7 @@ func TestParseProduceRequestTruncation(t *testing.T) { for _, version := range versions { t.Run(fmt.Sprintf("version_%d_truncation", version), func(t *testing.T) { - header := &KafkaRequestHeader{ - APIKey: APIKeyProduce, - APIVersion: version, - } + header := newTestHeader(APIKeyProduce, version) // Create a valid packet for this version validPacket := createValidProducePacket(version) @@ -485,7 +447,8 @@ func TestParseProduceRequestTruncation(t *testing.T) { for i := 1; i < len(validPacket); i++ { t.Run(fmt.Sprintf("truncated_at_%d", i), func(t *testing.T) { truncated := validPacket[:i] - _, err := ParseProduceRequest(newBytesReader(truncated), header) + r := largebuf.NewLargeBufferFrom(truncated).NewReader() + _, err := ParseProduceRequest(&r, header) assert.Error(t, err, "expected error for truncated packet at position %d for version %d", i, version) }) } @@ -499,15 +462,13 @@ func TestParseProduceRequestAllVersions(t *testing.T) { for _, version := range versions { t.Run(fmt.Sprintf("version_%d", version), func(t *testing.T) { - header := &KafkaRequestHeader{ - APIKey: APIKeyProduce, - APIVersion: version, - } + header := newTestHeader(APIKeyProduce, version) // Create a valid packet for this version validPacket := createValidProducePacket(version) - req, err := ParseProduceRequest(newBytesReader(validPacket), header) + r := largebuf.NewLargeBufferFrom(validPacket).NewReader() + req, err := ParseProduceRequest(&r, header) require.NoError(t, err, "unexpected error for version %d", version) require.NotNil(t, req) @@ -659,13 +620,11 @@ func TestParseProduceRequestEdgeCases(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - header := &KafkaRequestHeader{ - APIKey: APIKeyProduce, - APIVersion: tt.version, - } + header := newTestHeader(APIKeyProduce, tt.version) packet := tt.packet() - _, err := ParseProduceRequest(newBytesReader(packet), header) + r := largebuf.NewLargeBufferFrom(packet).NewReader() + _, err := ParseProduceRequest(&r, header) if tt.expectErr { assert.Error(t, err) diff --git a/pkg/internal/ebpf/kafkaparser/reader_test.go b/pkg/internal/ebpf/kafkaparser/reader_test.go deleted file mode 100644 index 85432dc270..0000000000 --- a/pkg/internal/ebpf/kafkaparser/reader_test.go +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package kafkaparser - -import "fmt" - -// bytesReader is a test helper that wraps a []byte to implement the byteReader interface. -// It is used in tests to call parser functions without depending on LargeBuffer. -type bytesReader struct { - data []byte - pos int -} - -func newBytesReader(data []byte) *bytesReader { - return &bytesReader{data: data} -} - -func (r *bytesReader) ReadN(n int) ([]byte, error) { - if r.pos+n > len(r.data) { - return nil, fmt.Errorf("ReadN: requested %d bytes but only %d remaining", n, len(r.data)-r.pos) - } - s := r.data[r.pos : r.pos+n] - r.pos += n - return s, nil -} - -func (r *bytesReader) Peek(n int) ([]byte, error) { - if r.pos+n > len(r.data) { - return nil, fmt.Errorf("Peek: requested %d bytes but only %d remaining", n, len(r.data)-r.pos) - } - return r.data[r.pos : r.pos+n], nil -} - -func (r *bytesReader) Skip(n int) error { - if r.pos+n > len(r.data) { - return fmt.Errorf("Skip: requested %d bytes but only %d remaining", n, len(r.data)-r.pos) - } - r.pos += n - return nil -} - -func (r *bytesReader) Remaining() int { - return len(r.data) - r.pos -} - -// Pos returns the current read position (bytes consumed so far). -func (r *bytesReader) Pos() int { - return r.pos -} diff --git a/pkg/internal/ebpf/kafkaparser/testhelper_test.go b/pkg/internal/ebpf/kafkaparser/testhelper_test.go new file mode 100644 index 0000000000..99222d9347 --- /dev/null +++ b/pkg/internal/ebpf/kafkaparser/testhelper_test.go @@ -0,0 +1,60 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package kafkaparser + +import ( + "encoding/binary" + + "go.opentelemetry.io/obi/pkg/internal/largebuf" +) + +// newTestHeader creates a minimal valid KafkaRequestHeader for body-parser tests. +func newTestHeader(apiKey KafkaAPIKey, apiVersion int16) KafkaRequestHeader { + flexible := false + switch apiKey { + case APIKeyProduce: + flexible = apiVersion >= 9 + case APIKeyFetch: + flexible = apiVersion >= 12 + case APIKeyMetadata: + flexible = apiVersion >= 9 + } + size := MinKafkaRequestLen + if flexible { + size++ // 0x00 byte for empty tagged-fields varint + } + buf := make([]byte, size) + binary.BigEndian.PutUint32(buf[0:4], uint32(size)) + binary.BigEndian.PutUint16(buf[4:6], uint16(int16(apiKey))) + binary.BigEndian.PutUint16(buf[6:8], uint16(apiVersion)) + binary.BigEndian.PutUint32(buf[8:12], 1) // CorrelationID = 1 + // buf[12:14] = 0 (clientID length = 0) + // buf[14] = 0 if flexible (tagged fields = empty, already 0 from make) + h, err := NewKafkaRequestHeader(largebuf.NewLargeBufferFrom(buf)) + if err != nil { + panic("newTestHeader: " + err.Error()) + } + return h +} + +// newUncheckedHeader creates a KafkaRequestHeader bypassing validation, +// for tests that need to exercise internal helpers with otherwise-invalid field combos. +func newUncheckedHeader(apiKey KafkaAPIKey, apiVersion int16) KafkaRequestHeader { + buf := make([]byte, MinKafkaRequestLen) + binary.BigEndian.PutUint16(buf[4:6], uint16(int16(apiKey))) + binary.BigEndian.PutUint16(buf[6:8], uint16(apiVersion)) + return KafkaRequestHeader{lb: largebuf.NewLargeBufferFrom(buf)} +} + +// buildRawHeaderBuf returns a 14-byte LargeBuffer encoding exactly the four +// fixed header fields. Used by TestValidateKafkaHeader to test validate() in +// isolation without going through NewKafkaRequestHeader. +func buildRawHeaderBuf(msgSize int32, apiKey KafkaAPIKey, apiVersion int16, correlationID int32) *largebuf.LargeBuffer { + buf := make([]byte, MinKafkaRequestLen) + binary.BigEndian.PutUint32(buf[0:4], uint32(msgSize)) + binary.BigEndian.PutUint16(buf[4:6], uint16(int16(apiKey))) + binary.BigEndian.PutUint16(buf[6:8], uint16(apiVersion)) + binary.BigEndian.PutUint32(buf[8:12], uint32(correlationID)) + return largebuf.NewLargeBufferFrom(buf) +} diff --git a/pkg/ebpf/common/large_buffer.go b/pkg/internal/largebuf/large_buffer.go similarity index 97% rename from pkg/ebpf/common/large_buffer.go rename to pkg/internal/largebuf/large_buffer.go index f4335e96df..5c301ccddb 100644 --- a/pkg/ebpf/common/large_buffer.go +++ b/pkg/internal/largebuf/large_buffer.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package ebpfcommon // import "go.opentelemetry.io/obi/pkg/ebpf/common" +package largebuf // import "go.opentelemetry.io/obi/pkg/internal/largebuf" import ( "encoding/binary" @@ -106,10 +106,12 @@ func (lb *LargeBuffer) Reset() { lb.total = 0 } -// NewReader creates a new [LargeBufferReader] positioned at byte 0 of the buffer. +// NewReader returns a new [LargeBufferReader] positioned at byte 0 of the buffer. +// The reader is returned by value; take its address (&r) when passing to functions +// that accept *LargeBufferReader or io.Reader. // Multiple independent readers can operate on the same buffer simultaneously. -func (lb *LargeBuffer) NewReader() *LargeBufferReader { - return &LargeBufferReader{lb: lb} +func (lb *LargeBuffer) NewReader() LargeBufferReader { + return LargeBufferReader{lb: lb} } // ── Absolute-offset access ──────────────────────────────────────────────────── @@ -360,7 +362,7 @@ type LargeBufferReader struct { } // Reset repositions this reader to the beginning of the buffer. -// Equivalent to discarding this reader and calling lb.NewReader(), but without the allocation. +// Equivalent to discarding this reader and calling lb.NewReader(). func (r *LargeBufferReader) Reset() { r.rchunk = 0 r.roff = 0 diff --git a/pkg/ebpf/common/large_buffer_test.go b/pkg/internal/largebuf/large_buffer_test.go similarity index 91% rename from pkg/ebpf/common/large_buffer_test.go rename to pkg/internal/largebuf/large_buffer_test.go index f45d7a309a..4e1642d03a 100644 --- a/pkg/ebpf/common/large_buffer_test.go +++ b/pkg/internal/largebuf/large_buffer_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package ebpfcommon +package largebuf import ( "bufio" @@ -49,7 +49,8 @@ func TestAppendChunk_copiesData(t *testing.T) { // Mutating src must not affect the buffer. src[0] = 'X' - got, err := lb.NewReader().ReadN(5) + r := lb.NewReader() + got, err := r.ReadN(5) require.NoError(t, err) assert.Equal(t, "world", string(got)) } @@ -61,7 +62,8 @@ func TestAppendChunk_multipleChunks(t *testing.T) { lb.AppendChunk([]byte("baz")) assert.Equal(t, 9, lb.Len()) - assert.Equal(t, 9, lb.NewReader().Remaining()) + r := lb.NewReader() + assert.Equal(t, 9, r.Remaining()) } // ── ReadN ───────────────────────────────────────────────────────────────────── @@ -119,7 +121,8 @@ func TestReadN_crossChunk_returnsCorrectBytes(t *testing.T) { lb.AppendChunk([]byte("abc")) lb.AppendChunk([]byte("def")) - got, err := lb.NewReader().ReadN(5) + r := lb.NewReader() + got, err := r.ReadN(5) require.NoError(t, err) assert.Equal(t, "abcde", string(got)) } @@ -145,7 +148,8 @@ func TestReadN_tooManyBytes_returnsError(t *testing.T) { lb := NewLargeBuffer() lb.AppendChunk([]byte("hi")) - _, err := lb.NewReader().ReadN(10) + r := lb.NewReader() + _, err := r.ReadN(10) assert.Error(t, err) } @@ -153,7 +157,8 @@ func TestReadN_zero_returnsNil(t *testing.T) { lb := NewLargeBuffer() lb.AppendChunk([]byte("hi")) - got, err := lb.NewReader().ReadN(0) + r := lb.NewReader() + got, err := r.ReadN(0) require.NoError(t, err) assert.Nil(t, got) } @@ -219,7 +224,8 @@ func TestSkip_tooMany_returnsError(t *testing.T) { lb := NewLargeBuffer() lb.AppendChunk([]byte("hi")) - assert.Error(t, lb.NewReader().Skip(10)) + r := lb.NewReader() + assert.Error(t, r.Skip(10)) } // ── Remaining ──────────────────────────────────────────────────────────────── @@ -278,7 +284,8 @@ func TestRead_ioReaderCompliance(t *testing.T) { lb.AppendChunk([]byte("hello ")) lb.AppendChunk([]byte("world")) - all, err := io.ReadAll(lb.NewReader()) + r := lb.NewReader() + all, err := io.ReadAll(&r) require.NoError(t, err) assert.Equal(t, "hello world", string(all)) } @@ -288,7 +295,7 @@ func TestRead_eoFOnEmpty(t *testing.T) { lb.AppendChunk([]byte("hi")) r := lb.NewReader() - _, _ = io.ReadAll(r) + _, _ = io.ReadAll(&r) n, err := r.Read(make([]byte, 4)) assert.Equal(t, 0, n) @@ -299,7 +306,8 @@ func TestRead_withBufioReader(t *testing.T) { lb := NewLargeBuffer() lb.AppendChunk([]byte("GET / HTTP/1.0\r\nHost: x\r\n\r\n")) - br := bufio.NewReader(lb.NewReader()) + r := lb.NewReader() + br := bufio.NewReader(&r) line, err := br.ReadString('\n') require.NoError(t, err) assert.Equal(t, "GET / HTTP/1.0\r\n", line) @@ -344,7 +352,8 @@ func TestBytes_multiChunk(t *testing.T) { func TestBytes_empty(t *testing.T) { lb := NewLargeBuffer() - assert.Nil(t, lb.NewReader().Bytes()) + r := lb.NewReader() + assert.Nil(t, r.Bytes()) } func TestBytes_afterReadAll_returnsNil(t *testing.T) { @@ -360,7 +369,8 @@ func TestBytes_singleChunk_isSharedView(t *testing.T) { lb := NewLargeBuffer() lb.AppendChunk([]byte("hello")) - got := lb.NewReader().Bytes() + r := lb.NewReader() + got := r.Bytes() // Bytes() returns a view into the internal chunk — mutating it affects the chunk. got[0] = 'X' @@ -371,7 +381,8 @@ func TestBytes_newLargeBufferFrom_isSharedView(t *testing.T) { src := []byte("hello") lb := NewLargeBufferFrom(src) - got := lb.NewReader().Bytes() + r := lb.NewReader() + got := r.Bytes() // Bytes() returns a view into src — mutating it affects the original slice. got[0] = 'X' @@ -462,7 +473,8 @@ func TestReadN_manySmallChunks(t *testing.T) { expected = append(expected, b) } - got, err := lb.NewReader().ReadN(26) + r := lb.NewReader() + got, err := r.ReadN(26) require.NoError(t, err) assert.Equal(t, expected, got) } @@ -473,7 +485,8 @@ func TestReadN_spanThreeChunks(t *testing.T) { lb.AppendChunk([]byte("cd")) lb.AppendChunk([]byte("ef")) - got, err := lb.NewReader().ReadN(5) + r := lb.NewReader() + got, err := r.ReadN(5) require.NoError(t, err) assert.Equal(t, "abcde", string(got)) } @@ -1002,3 +1015,46 @@ func TestMultipleReaders_independent(t *testing.T) { require.NoError(t, err) assert.Equal(t, "def", string(got1)) } + +// ── Heap-escape verification ────────────────────────────────────────────────── + +// TestNewReader_zeroAllocs verifies that NewReader() does not heap-allocate the +// returned LargeBufferReader. The reader is assigned to a local variable and used +// only within the same call frame, so escape analysis can (and must) stack-allocate it. +func TestNewReader_zeroAllocs(t *testing.T) { + lb := NewLargeBufferFrom(bytes.Repeat([]byte{0xAB}, 32)) + + allocs := testing.AllocsPerRun(1000, func() { + r := lb.NewReader() + // Read a few fields to ensure the compiler doesn't elide the reader. + b, _ := r.ReadN(4) + _ = b[0] + b, _ = r.ReadN(4) + _ = b[0] + }) + + assert.InDelta(t, float64(0), allocs, 0, "NewReader() must not heap-allocate the LargeBufferReader") +} + +// TestNewReader_multiChunk_zeroAllocs verifies that LargeBufferReader does not +// heap-allocate on cross-chunk reads once the scratch buffer is warmed up. +// The reader is reused via Reset() so that scratch reuse is exercised without +// re-allocating a new reader (and new scratch slice) on each iteration. +func TestNewReader_multiChunk_zeroAllocs(t *testing.T) { + lb := NewLargeBuffer() + lb.AppendChunk(bytes.Repeat([]byte{0x01}, 8)) + lb.AppendChunk(bytes.Repeat([]byte{0x02}, 8)) + + // Create a single reader and warm up its scratch buffer with a cross-chunk read. + r := lb.NewReader() + _, _ = r.ReadN(12) + + allocs := testing.AllocsPerRun(1000, func() { + r.Reset() + // Cross-chunk read reuses the already-allocated scratch — zero new allocations. + b, _ := r.ReadN(12) + _ = b[0] + }) + + assert.InDelta(t, float64(0), allocs, 0, "cross-chunk ReadN must not allocate after scratch warm-up") +}