From 7a218fe96fdf77358da4b795d793aaae3e3b97b2 Mon Sep 17 00:00:00 2001 From: Rafael Roquetto Date: Thu, 5 Mar 2026 12:09:10 -0700 Subject: [PATCH 1/3] Better use large buffer with redis --- pkg/ebpf/common/http/sqlpp.go | 6 +- pkg/ebpf/common/redis_detect_transform.go | 53 ++++++------ .../common/redis_detect_transform_test.go | 12 +-- pkg/ebpf/common/tcp_detect_transform.go | 4 +- pkg/export/attributes/env.go | 2 +- pkg/internal/split/splititerator.go | 40 +++++---- pkg/internal/split/splititerator_test.go | 83 ++++++++++++++++++- 7 files changed, 143 insertions(+), 57 deletions(-) diff --git a/pkg/ebpf/common/http/sqlpp.go b/pkg/ebpf/common/http/sqlpp.go index 69743ff5a2..28d552982f 100644 --- a/pkg/ebpf/common/http/sqlpp.go +++ b/pkg/ebpf/common/http/sqlpp.go @@ -110,7 +110,7 @@ func hasN1QLVersion(resp *http.Response) bool { } contentType := resp.Header.Get("Content-Type") // Split by semicolons to get individual parameters - iter := split.NewIterator(contentType, ";") + iter := split.NewStringIterator(contentType, ";") for part, eof := iter.Next(); !eof; part, eof = iter.Next() { part = strings.TrimSuffix(part, ";") part = strings.TrimSpace(part) @@ -134,7 +134,7 @@ func parseSQLPPTablePath(table string, hasQueryContext bool) (bucket, collection } // Count parts first - iter := split.NewIterator(table, ".") + iter := split.NewStringIterator(table, ".") count := 0 for _, eof := iter.Next(); !eof; _, eof = iter.Next() { count++ @@ -198,7 +198,7 @@ func parseSQLPPRequest(req *http.Request) (*sqlppRequest, error) { // extractFormValue extracts a value from form-encoded data func extractFormValue(data, key string) string { - iter := split.NewIterator(data, "&") + iter := split.NewStringIterator(data, "&") for pair, eof := iter.Next(); !eof; pair, eof = iter.Next() { pair = strings.TrimSuffix(pair, "&") // Split by first "=" only (equivalent to SplitN with 2) diff --git a/pkg/ebpf/common/redis_detect_transform.go b/pkg/ebpf/common/redis_detect_transform.go index 8a420dbfc2..a89d7bb9e9 100644 --- a/pkg/ebpf/common/redis_detect_transform.go +++ b/pkg/ebpf/common/redis_detect_transform.go @@ -22,15 +22,18 @@ import ( const minRedisFrameLen = 3 -var redisErrorCodes = [...]string{ - "ERR ", - "WRONGTYPE ", - "MOVED ", - "ASK ", - "BUSY ", - "NOSCRIPT ", - "CLUSTERDOWN ", - "READONLY ", +var redisErrors = [...]struct { + prefix []byte + code string +}{ + {[]byte("ERR "), "ERR"}, + {[]byte("WRONGTYPE "), "WRONGTYPE"}, + {[]byte("MOVED "), "MOVED"}, + {[]byte("ASK "), "ASK"}, + {[]byte("BUSY "), "BUSY"}, + {[]byte("NOSCRIPT "), "NOSCRIPT"}, + {[]byte("CLUSTERDOWN "), "CLUSTERDOWN"}, + {[]byte("READONLY "), "READONLY"}, } func isRedis(buf *largebuf.LargeBuffer) bool { @@ -65,12 +68,12 @@ func isRedisOp(buf []uint8) bool { } func getRedisError(buf []uint8) (request.DBError, bool) { - description := strings.Trim(string(buf), "\r\n") + description := string(bytes.Trim(buf, "\r\n")) errorCode := "" - for _, redisErrorCode := range redisErrorCodes { - if bytes.HasPrefix(buf, []byte(redisErrorCode)) { - errorCode = strings.TrimSpace(redisErrorCode) + for _, e := range redisErrors { + if bytes.HasPrefix(buf, e.prefix) { + errorCode = e.code break } } @@ -111,10 +114,10 @@ func isValidRedisChar(c byte) bool { c == '.' || c == ' ' || c == '-' || c == '_' } -func parseRedisRequest(buf string) (string, string, bool) { +func parseRedisRequest(buf []byte) (string, string, bool) { const redisDelim = "\r\n" - lines := split.NewIterator(buf, redisDelim) + lines := split.NewBytesIterator(buf, []byte(redisDelim)) _, eof := lines.Next() @@ -150,18 +153,18 @@ func parseRedisRequest(buf string) (string, string, bool) { break } - if line == redisDelim { + if bytes.Equal(line, []byte(redisDelim)) { continue } if !read { - if isRedisOp([]uint8(line)) { + if isRedisOp(line) { read = true } else { break } } else { - if isRedisOp([]uint8(line)) { + if isRedisOp(line) { text.WriteString("; ") continue } @@ -169,12 +172,12 @@ func parseRedisRequest(buf string) (string, string, bool) { break } - trimmed := strings.TrimSuffix(line, redisDelim) + trimmed := bytes.TrimSuffix(line, []byte(redisDelim)) if op == "" { - op = trimmed + op = string(trimmed) } - text.WriteString(trimmed) + text.Write(trimmed) text.WriteString(" ") read = false } @@ -204,8 +207,8 @@ func getRedisDB(connInfo BpfConnectionInfoT, op, text string, dbCache *simplelru return -1, false } db, found := dbCache.Get(connInfo) - switch strings.ToUpper(op) { - case "SELECT": + switch { + case strings.EqualFold(op, "SELECT"): // get db number from text after first space if text != "" { parts := strings.Split(text, " ") @@ -215,7 +218,7 @@ func getRedisDB(connInfo BpfConnectionInfoT, op, text string, dbCache *simplelru } } } - case "QUIT": + case strings.EqualFold(op, "QUIT"): dbCache.Remove(connInfo) } return db, found @@ -283,7 +286,7 @@ func ReadGoRedisRequestIntoSpan(record *ringbuf.Record) (request.Span, bool, err hostPort = int(event.Conn.D_port) } - op, text, ok := parseRedisRequest(string(event.Buf[:])) + op, text, ok := parseRedisRequest(event.Buf[:]) if !ok { // We know it's redis request here, it just didn't complete correctly diff --git a/pkg/ebpf/common/redis_detect_transform_test.go b/pkg/ebpf/common/redis_detect_transform_test.go index f2a9e8de40..1a99922636 100644 --- a/pkg/ebpf/common/redis_detect_transform_test.go +++ b/pkg/ebpf/common/redis_detect_transform_test.go @@ -36,38 +36,38 @@ func TestCRLFMatching(t *testing.T) { } func TestRedisParsing(t *testing.T) { - proper := "*2\r\n$3\r\nGET\r\n$5\r\nbeyla" + proper := []byte("*2\r\n$3\r\nGET\r\n$5\r\nbeyla") op, text, ok := parseRedisRequest(proper) assert.True(t, ok) assert.Equal(t, "GET", op) assert.Equal(t, "GET beyla", text) - weird := "*2\r\nGET\r\nbeyla" + weird := []byte("*2\r\nGET\r\nbeyla") op, text, ok = parseRedisRequest(weird) assert.True(t, ok) assert.Empty(t, op) assert.Empty(t, text) - unknown := "2\r\nGET\r\nbeyla" + unknown := []byte("2\r\nGET\r\nbeyla") op, text, ok = parseRedisRequest(unknown) assert.True(t, ok) assert.Empty(t, op) assert.Empty(t, text) - op, text, ok = parseRedisRequest("2") + op, text, ok = parseRedisRequest([]byte("2")) assert.False(t, ok) assert.Empty(t, op) assert.Empty(t, text) - multi := fmt.Sprintf("*4\r\n$6\r\nclient\r\n$7\r\nsetinfo\r\n$8\r\nLIB-NAME\r\n$19\r\n%s(,go1.22.2)\r\n*4\r\n$6\r\nclient\r\n$7\r\nsetinfo\r\n$7\r\nLIB-VER\r\n$5\r\n9.5.1\r\n", "go-redis") + multi := []byte(fmt.Sprintf("*4\r\n$6\r\nclient\r\n$7\r\nsetinfo\r\n$8\r\nLIB-NAME\r\n$19\r\n%s(,go1.22.2)\r\n*4\r\n$6\r\nclient\r\n$7\r\nsetinfo\r\n$7\r\nLIB-VER\r\n$5\r\n9.5.1\r\n", "go-redis")) op, text, ok = parseRedisRequest(multi) assert.True(t, ok) assert.Equal(t, "client", op) assert.Equal(t, "client setinfo LIB-NAME go-redis(,go1.22.2) ; client setinfo LIB-VER 9.5.1", text) hmset := []byte{42, 52, 13, 10, 36, 53, 13, 10, 72, 77, 83, 69, 84, 13, 10, 36, 51, 54, 13, 10, 48, 99, 57, 102, 97, 56, 97, 97, 45, 50, 56, 49, 102, 45, 49, 49, 101, 102, 45, 57, 55, 98, 57, 45, 98, 101, 57, 54, 48, 48, 99, 97, 48, 102, 50, 55, 13, 10, 36, 52, 13, 10, 99, 97, 114, 116, 13, 10, 36, 53, 52, 13, 10, 10, 36, 48, 99, 57, 102, 97, 56, 97, 97, 45, 50, 56, 49, 102, 45, 49, 49, 101, 102, 45, 57, 55, 98, 57, 45, 98, 101, 57, 54, 48, 48, 99, 97, 48, 102, 50, 55, 18, 14, 10, 10, 79, 76, 74, 67, 69, 83, 80, 67, 55, 90, 16, 5, 13, 10, 0, 10, 72, 81, 84, 71, 87, 71, 80, 78, 72, 52, 16, 1, 13, 10, 0, 10, 49, 89, 77, 87, 87, 78, 49, 78, 52, 79, 16, 5, 13, 10, 0, 10, 10, 57, 83, 73, 81, 84, 56, 84, 79, 74, 79, 16, 5, 13, 10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} - op, text, ok = parseRedisRequest(string(hmset)) + op, text, ok = parseRedisRequest(hmset) assert.True(t, ok) assert.Equal(t, "HMSET", op) diff --git a/pkg/ebpf/common/tcp_detect_transform.go b/pkg/ebpf/common/tcp_detect_transform.go index 8bba961586..2c1557c8c5 100644 --- a/pkg/ebpf/common/tcp_detect_transform.go +++ b/pkg/ebpf/common/tcp_detect_transform.go @@ -138,13 +138,13 @@ func ReadTCPRequestIntoSpan(parseCtx *EBPFParseContext, cfg *config.EBPFTracer, switch { case isRedis(requestBuffer) && isRedis(responseBuffer): - op, text, ok := parseRedisRequest(string(requestBuffer.UnsafeView())) + op, text, ok := parseRedisRequest(requestBuffer.UnsafeView()) if ok { var status int var redisErr request.DBError if op == "" { - op, text, ok = parseRedisRequest(string(responseBuffer.UnsafeView())) + op, text, ok = parseRedisRequest(responseBuffer.UnsafeView()) if !ok || op == "" { return request.Span{}, true, nil // ignore if we couldn't parse it } diff --git a/pkg/export/attributes/env.go b/pkg/export/attributes/env.go index e4de1a5cb3..accc468f88 100644 --- a/pkg/export/attributes/env.go +++ b/pkg/export/attributes/env.go @@ -12,7 +12,7 @@ import ( type VarHandler func(k string, v string) func ParseOTELResourceVariable(envVar string, handler VarHandler) { - variables := split.NewIterator(envVar, ",") + variables := split.NewStringIterator(envVar, ",") for { variable, eof := variables.Next() diff --git a/pkg/internal/split/splititerator.go b/pkg/internal/split/splititerator.go index 06aed5932e..ff510a66fc 100644 --- a/pkg/internal/split/splititerator.go +++ b/pkg/internal/split/splititerator.go @@ -6,34 +6,37 @@ package split // import "go.opentelemetry.io/obi/pkg/internal/split" import ( + "bytes" "strings" ) -// Iterator is alternative to strings.Split(str, delim) - each call to Nex() -// returns a a substring slice, allowing string tokens or lines to be processed -// in place (zero-copy), without the need of allocations -type Iterator struct { - startBuf string - buf string - delim string +// Iterator is an alternative to strings.Split / bytes.Split — each call to +// Next returns a sub-slice of the original, allowing tokens or lines to be +// processed in place without allocations. +type Iterator[T string | []byte] struct { + startBuf T + buf T + delim T + indexOf func(T, T) int } -func NewIterator(buf string, delim string) *Iterator { - return &Iterator{ - startBuf: buf, - buf: buf, - delim: delim, - } +func NewStringIterator(buf, delim string) Iterator[string] { + return Iterator[string]{startBuf: buf, buf: buf, delim: delim, indexOf: strings.Index} +} + +func NewBytesIterator(buf, delim []byte) Iterator[[]byte] { + return Iterator[[]byte]{startBuf: buf, buf: buf, delim: delim, indexOf: bytes.Index} } // Next returns a token and false if there are any tokens available, otherwise -// returns "" and true to convey EOF has been reached -func (sp *Iterator) Next() (string, bool) { +// returns the zero value and true to convey EOF has been reached. +func (sp *Iterator[T]) Next() (T, bool) { if len(sp.buf) == 0 { - return "", true + var zero T + return zero, true } - index := strings.Index(sp.buf, sp.delim) + index := sp.indexOf(sp.buf, sp.delim) if index == -1 { buf := sp.buf @@ -49,6 +52,7 @@ func (sp *Iterator) Next() (string, bool) { return buf, false } -func (sp *Iterator) Reset() { +// Reset repositions the iterator to the beginning of the buffer. +func (sp *Iterator[T]) Reset() { sp.buf = sp.startBuf } diff --git a/pkg/internal/split/splititerator_test.go b/pkg/internal/split/splititerator_test.go index 6a680835ca..57491f267f 100644 --- a/pkg/internal/split/splititerator_test.go +++ b/pkg/internal/split/splititerator_test.go @@ -15,7 +15,22 @@ type testInput struct { } func runTest(t *testing.T, in string, delim string, expected []testInput) { - sp := NewIterator(in, delim) + sp := NewStringIterator(in, delim) + + for _, e := range expected { + w, eof := sp.Next() + assert.Equal(t, e.eof, eof) + assert.Equal(t, e.token, w) + } +} + +type bytesTestInput struct { + token []byte + eof bool +} + +func runBytesTest(t *testing.T, in []byte, delim []byte, expected []bytesTestInput) { + sp := NewBytesIterator(in, delim) for _, e := range expected { w, eof := sp.Next() @@ -77,7 +92,7 @@ func TestSplitIterator_multi(t *testing.T) { func TestSplitIterator_reset(t *testing.T) { in := "one|line|per|time|" - sp := NewIterator(in, "|") + sp := NewStringIterator(in, "|") w, eof := sp.Next() assert.False(t, eof) @@ -93,3 +108,67 @@ func TestSplitIterator_reset(t *testing.T) { assert.False(t, eof) assert.Equal(t, "one|", w) } + +func TestBytesIterator(t *testing.T) { + in := []byte("ab;cd;;fg;") + + expected := []bytesTestInput{ + {token: []byte("ab;"), eof: false}, + {token: []byte("cd;"), eof: false}, + {token: []byte(";"), eof: false}, + {token: []byte("fg;"), eof: false}, + {token: nil, eof: true}, + } + + runBytesTest(t, in, []byte(";"), expected) +} + +func TestBytesIterator_empty(t *testing.T) { + runBytesTest(t, []byte{}, []byte(";"), []bytesTestInput{{token: nil, eof: true}}) +} + +func TestBytesIterator_lead_trail(t *testing.T) { + in := []byte("oo;oo") + + expected := []bytesTestInput{ + {token: []byte("oo;"), eof: false}, + {token: []byte("oo"), eof: false}, + {token: nil, eof: true}, + } + + runBytesTest(t, in, []byte(";"), expected) +} + +func TestBytesIterator_multi(t *testing.T) { + in := []byte("one\r\nline\r\nper\r\ntime\r\n") + + expected := []bytesTestInput{ + {token: []byte("one\r\n"), eof: false}, + {token: []byte("line\r\n"), eof: false}, + {token: []byte("per\r\n"), eof: false}, + {token: []byte("time\r\n"), eof: false}, + {token: nil, eof: true}, + } + + runBytesTest(t, in, []byte("\r\n"), expected) +} + +func TestBytesIterator_reset(t *testing.T) { + in := []byte("one|line|per|time|") + + sp := NewBytesIterator(in, []byte("|")) + + w, eof := sp.Next() + assert.False(t, eof) + assert.Equal(t, []byte("one|"), w) + + w, eof = sp.Next() + assert.False(t, eof) + assert.Equal(t, []byte("line|"), w) + + sp.Reset() + + w, eof = sp.Next() + assert.False(t, eof) + assert.Equal(t, []byte("one|"), w) +} From 343852eaf696a9396cce933ee1f83290e1ee9b34 Mon Sep 17 00:00:00 2001 From: Rafael Roquetto Date: Thu, 5 Mar 2026 15:27:10 -0700 Subject: [PATCH 2/3] Review feedback --- pkg/ebpf/common/redis_detect_transform.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/pkg/ebpf/common/redis_detect_transform.go b/pkg/ebpf/common/redis_detect_transform.go index a89d7bb9e9..4873b91c50 100644 --- a/pkg/ebpf/common/redis_detect_transform.go +++ b/pkg/ebpf/common/redis_detect_transform.go @@ -20,7 +20,12 @@ import ( "go.opentelemetry.io/obi/pkg/internal/split" ) -const minRedisFrameLen = 3 +const ( + minRedisFrameLen = 3 + redisDelim = "\r\n" +) + +var redisDelimBytes = []byte(redisDelim) var redisErrors = [...]struct { prefix []byte @@ -115,9 +120,7 @@ func isValidRedisChar(c byte) bool { } func parseRedisRequest(buf []byte) (string, string, bool) { - const redisDelim = "\r\n" - - lines := split.NewBytesIterator(buf, []byte(redisDelim)) + lines := split.NewBytesIterator(buf, redisDelimBytes) _, eof := lines.Next() @@ -153,7 +156,7 @@ func parseRedisRequest(buf []byte) (string, string, bool) { break } - if bytes.Equal(line, []byte(redisDelim)) { + if bytes.Equal(line, redisDelimBytes) { continue } @@ -172,7 +175,7 @@ func parseRedisRequest(buf []byte) (string, string, bool) { break } - trimmed := bytes.TrimSuffix(line, []byte(redisDelim)) + trimmed := bytes.TrimSuffix(line, redisDelimBytes) if op == "" { op = string(trimmed) From 7a34330d2d43c841407e516a2ef239dbba4c9993 Mon Sep 17 00:00:00 2001 From: Rafael Roquetto Date: Thu, 5 Mar 2026 15:36:47 -0700 Subject: [PATCH 3/3] Review feedback --- pkg/internal/split/splititerator.go | 10 ++++++++-- pkg/internal/split/splititerator_test.go | 8 ++++++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/pkg/internal/split/splititerator.go b/pkg/internal/split/splititerator.go index ff510a66fc..b62005fd26 100644 --- a/pkg/internal/split/splititerator.go +++ b/pkg/internal/split/splititerator.go @@ -1,8 +1,8 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -// Package split provides an Iterator that allows for zero-copy string -// splitting. +// Package split provides a generic Iterator for zero-copy splitting of +// strings and byte slices. package split // import "go.opentelemetry.io/obi/pkg/internal/split" import ( @@ -21,10 +21,16 @@ type Iterator[T string | []byte] struct { } func NewStringIterator(buf, delim string) Iterator[string] { + if len(delim) == 0 { + panic("split: empty delimiter") + } return Iterator[string]{startBuf: buf, buf: buf, delim: delim, indexOf: strings.Index} } func NewBytesIterator(buf, delim []byte) Iterator[[]byte] { + if len(delim) == 0 { + panic("split: empty delimiter") + } return Iterator[[]byte]{startBuf: buf, buf: buf, delim: delim, indexOf: bytes.Index} } diff --git a/pkg/internal/split/splititerator_test.go b/pkg/internal/split/splititerator_test.go index 57491f267f..80838c7b45 100644 --- a/pkg/internal/split/splititerator_test.go +++ b/pkg/internal/split/splititerator_test.go @@ -153,6 +153,14 @@ func TestBytesIterator_multi(t *testing.T) { runBytesTest(t, in, []byte("\r\n"), expected) } +func TestStringIterator_emptyDelim_panics(t *testing.T) { + assert.Panics(t, func() { NewStringIterator("abc", "") }) +} + +func TestBytesIterator_emptyDelim_panics(t *testing.T) { + assert.Panics(t, func() { NewBytesIterator([]byte("abc"), []byte{}) }) +} + func TestBytesIterator_reset(t *testing.T) { in := []byte("one|line|per|time|")