Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/ebpf/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ type EBPFParseContext struct {
h2c *lru.Cache[uint64, h2Connection]
redisDBCache *simplelru.LRU[BpfConnectionInfoT, int]
couchbaseBucketCache *simplelru.LRU[BpfConnectionInfoT, CouchbaseBucketInfo]
largeBuffers *expirable.LRU[largeBufferKey, *largeBuffer]
largeBuffers *expirable.LRU[largeBufferKey, *LargeBuffer]
mongoRequestCache PendingMongoDBRequests
mysqlPreparedStatements *simplelru.LRU[mysqlPreparedStatementsKey, string]
postgresPreparedStatements *simplelru.LRU[postgresPreparedStatementsKey, string]
Expand Down Expand Up @@ -213,7 +213,7 @@ func NewEBPFParseContext(cfg *config.EBPFTracer, spansChan *msg.Queue[[]request.
)

h2c, _ := lru.New[uint64, h2Connection](1024 * 10)
largeBuffers := expirable.NewLRU[largeBufferKey, *largeBuffer](1024, nil, 5*time.Minute)
largeBuffers := expirable.NewLRU[largeBufferKey, *LargeBuffer](1024, nil, 5*time.Minute)

if cfg != nil {
protocolDebug = cfg.ProtocolDebug
Expand Down
8 changes: 5 additions & 3 deletions pkg/ebpf/common/couchbase_detect_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ type CouchbaseInfo struct {
// ProcessPossibleCouchbaseEvent attempts to parse the event as a Couchbase memcached binary protocol event.
// Returns a slice of CouchbaseInfo if successful, along with a boolean indicating if the event should be ignored,
// and an error if parsing failed. Multiple packets may be present in a single TCP segment due to pipelining.
func ProcessPossibleCouchbaseEvent(event *TCPRequestInfo, requestBuf []byte, responseBuf []byte, bucketCache *simplelru.LRU[BpfConnectionInfoT, CouchbaseBucketInfo]) (*CouchbaseInfo, bool, error) {
info, ignore, err := processCouchbaseEvent(event.ConnInfo, requestBuf, responseBuf, bucketCache)
func ProcessPossibleCouchbaseEvent(event *TCPRequestInfo, requestBuf *LargeBuffer, responseBuf *LargeBuffer, bucketCache *simplelru.LRU[BpfConnectionInfoT, CouchbaseBucketInfo]) (*CouchbaseInfo, bool, error) {
reqRaw := requestBuf.UnsafeView()
respRaw := responseBuf.UnsafeView()
info, ignore, err := processCouchbaseEvent(event.ConnInfo, reqRaw, respRaw, bucketCache)
// If parsing failed (error or no valid packets found), try with buffers reversed
if err != nil {
// Try with buffers reversed - we might have captured it backwards
info, ignore, err = processCouchbaseEvent(event.ConnInfo, responseBuf, requestBuf, bucketCache)
info, ignore, err = processCouchbaseEvent(event.ConnInfo, respRaw, reqRaw, bucketCache)
if err == nil {
reverseTCPEvent(event)
return info, false, nil
Expand Down
16 changes: 8 additions & 8 deletions pkg/ebpf/common/couchbase_detect_transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ func TestProcessPossibleCouchbaseEventReversedBuffers(t *testing.T) {
}

// Test with buffers in correct order
info, ignore, err := ProcessPossibleCouchbaseEvent(event, requestBuf, responseBuf, cache)
info, ignore, err := ProcessPossibleCouchbaseEvent(event, NewLargeBufferFrom(requestBuf), NewLargeBufferFrom(responseBuf), cache)
require.NoError(t, err)
assert.False(t, ignore)
assert.Equal(t, "GET", info.Operation)
Expand All @@ -563,7 +563,7 @@ func TestProcessPossibleCouchbaseEventReversedBuffers(t *testing.T) {
ConnInfo: connInfo,
Direction: 1,
}
info, ignore, err = ProcessPossibleCouchbaseEvent(event2, garbageBuf, requestBuf, cache)
info, ignore, err = ProcessPossibleCouchbaseEvent(event2, NewLargeBufferFrom(garbageBuf), NewLargeBufferFrom(requestBuf), cache)
require.NoError(t, err)
assert.False(t, ignore)
require.NotNil(t, info)
Expand Down Expand Up @@ -594,7 +594,7 @@ func TestProcessPossibleCouchbaseEventConnectionIsolation(t *testing.T) {
selectBucket1Resp := makeCouchbaseResponsePacket(couchbasekv.OpcodeSelectBucket, couchbasekv.StatusSuccess, "")

event1 := &TCPRequestInfo{ConnInfo: connInfo1, Direction: 1}
_, ignore, err := ProcessPossibleCouchbaseEvent(event1, selectBucket1Req, selectBucket1Resp, cache)
_, ignore, err := ProcessPossibleCouchbaseEvent(event1, NewLargeBufferFrom(selectBucket1Req), NewLargeBufferFrom(selectBucket1Resp), cache)
require.NoError(t, err)
assert.True(t, ignore) // SELECT_BUCKET is ignored

Expand All @@ -603,7 +603,7 @@ func TestProcessPossibleCouchbaseEventConnectionIsolation(t *testing.T) {
selectBucket2Resp := makeCouchbaseResponsePacket(couchbasekv.OpcodeSelectBucket, couchbasekv.StatusSuccess, "")

event2 := &TCPRequestInfo{ConnInfo: connInfo2, Direction: 1}
_, ignore, err = ProcessPossibleCouchbaseEvent(event2, selectBucket2Req, selectBucket2Resp, cache)
_, ignore, err = ProcessPossibleCouchbaseEvent(event2, NewLargeBufferFrom(selectBucket2Req), NewLargeBufferFrom(selectBucket2Resp), cache)
require.NoError(t, err)
assert.True(t, ignore)

Expand All @@ -612,7 +612,7 @@ func TestProcessPossibleCouchbaseEventConnectionIsolation(t *testing.T) {
getCollID1Resp := makeCouchbaseResponsePacket(couchbasekv.OpcodeCollectionsGetID, couchbasekv.StatusSuccess, "")

event1 = &TCPRequestInfo{ConnInfo: connInfo1, Direction: 1}
_, ignore, err = ProcessPossibleCouchbaseEvent(event1, getCollID1Req, getCollID1Resp, cache)
_, ignore, err = ProcessPossibleCouchbaseEvent(event1, NewLargeBufferFrom(getCollID1Req), NewLargeBufferFrom(getCollID1Resp), cache)
require.NoError(t, err)
assert.True(t, ignore)

Expand All @@ -621,7 +621,7 @@ func TestProcessPossibleCouchbaseEventConnectionIsolation(t *testing.T) {
getCollID2Resp := makeCouchbaseResponsePacket(couchbasekv.OpcodeCollectionsGetID, couchbasekv.StatusSuccess, "")

event2 = &TCPRequestInfo{ConnInfo: connInfo2, Direction: 1}
_, ignore, err = ProcessPossibleCouchbaseEvent(event2, getCollID2Req, getCollID2Resp, cache)
_, ignore, err = ProcessPossibleCouchbaseEvent(event2, NewLargeBufferFrom(getCollID2Req), NewLargeBufferFrom(getCollID2Resp), cache)
require.NoError(t, err)
assert.True(t, ignore)

Expand All @@ -631,7 +631,7 @@ func TestProcessPossibleCouchbaseEventConnectionIsolation(t *testing.T) {

// GET on connection 1 should have bucket1/scope1/coll1
event1 = &TCPRequestInfo{ConnInfo: connInfo1, Direction: 1}
info1, ignore, err := ProcessPossibleCouchbaseEvent(event1, getReq, getResp, cache)
info1, ignore, err := ProcessPossibleCouchbaseEvent(event1, NewLargeBufferFrom(getReq), NewLargeBufferFrom(getResp), cache)
require.NoError(t, err)
assert.False(t, ignore)
require.NotNil(t, info1)
Expand All @@ -641,7 +641,7 @@ func TestProcessPossibleCouchbaseEventConnectionIsolation(t *testing.T) {

// GET on connection 2 should have bucket2/scope2/coll2
event2 = &TCPRequestInfo{ConnInfo: connInfo2, Direction: 1}
info2, ignore, err := ProcessPossibleCouchbaseEvent(event2, getReq, getResp, cache)
info2, ignore, err := ProcessPossibleCouchbaseEvent(event2, NewLargeBufferFrom(getReq), NewLargeBufferFrom(getResp), cache)
require.NoError(t, err)
assert.False(t, ignore)
require.NotNil(t, info2)
Expand Down
56 changes: 29 additions & 27 deletions pkg/ebpf/common/fast_cgi_detect_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,50 +108,51 @@ func parseCGITable(b []byte) map[string]string {
return res
}

func maybeFastCGI(b []byte) bool {
if len(b) <= fastCGIRequestHeaderLen {
func maybeFastCGI(b *LargeBuffer) bool {
if b.Len() <= fastCGIRequestHeaderLen {
return false
}

methodPos := bytes.Index(b, []byte(requestMethodKey))

return methodPos >= 0
return bytes.Contains(b.UnsafeView(), []byte(requestMethodKey))
}

func parseHeader(b []byte) ([]byte, error) {
func parseHeader(b *LargeBuffer) ([]byte, error) {
r := b.NewReader()
for {
hdr, err := readFastCGIHeader(b)
if r.Remaining() < fastCGIRequestHeaderLen {
return nil, errors.New("payload too short")
}
hdrBytes, err := r.ReadN(fastCGIRequestHeaderLen)
if err != nil {
return nil, errors.New("payload too short")
}
hdr, err := readFastCGIHeader(hdrBytes)
if err != nil {
return nil, errors.New("payload too short")
}

if hdr.Type == fcgiFrameTypeParams {
if len(b) <= fastCGIRequestHeaderLen {
if r.Remaining() == 0 {
return nil, errors.New("payload too short")
}
b = b[fastCGIRequestHeaderLen:]
break
rest, _ := r.ReadN(r.Remaining())
return rest, nil
}
payloadOffset := int(fastCGIRequestHeaderLen + hdr.ContentLength + uint16(hdr.PaddingLength))
if len(b) <= payloadOffset {
payloadOffset := int(hdr.ContentLength) + int(hdr.PaddingLength)
if err := r.Skip(payloadOffset); err != nil {
return nil, errors.New("payload too short")
}
b = b[payloadOffset:]
}

return b, nil
}

func detectFastCGI(b, rb []byte) (string, string, int) {
var err error
b, err = parseHeader(b)
func detectFastCGI(b, rb *LargeBuffer) (string, string, int) {
raw, err := parseHeader(b)
if err != nil {
return "", "", -1
}

methodPos := bytes.Index(b, []byte(requestMethodKey))
methodPos := bytes.Index(raw, []byte(requestMethodKey))
if methodPos >= 0 {
kv := parseCGITable(b)
kv := parseCGITable(raw)

method, ok := kv[requestMethodKey]
if !ok {
Expand All @@ -162,17 +163,18 @@ func detectFastCGI(b, rb []byte) (string, string, int) {
// Translate the status code into HTTP, 200 OK, 500 ERR
status := 200

if len(rb) >= 2 {
if rb[1] == responseError {
rbRaw := rb.UnsafeView()
if len(rbRaw) >= 2 {
if rbRaw[1] == responseError {
status = 500
}

statusPos := bytes.Index(rb, []byte(responseStatusKey))
statusPos := bytes.Index(rbRaw, []byte(responseStatusKey))
if statusPos >= 0 {
rb = rb[statusPos+len(responseStatusKey):]
nextSpace := bytes.Index(rb, []byte(" "))
rbRaw = rbRaw[statusPos+len(responseStatusKey):]
nextSpace := bytes.Index(rbRaw, []byte(" "))
if nextSpace > 0 {
statusStr := string(rb[:nextSpace])
statusStr := string(rbRaw[:nextSpace])
if parsed, err := strconv.ParseInt(statusStr, 10, 32); err == nil {
status = int(parsed)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ebpf/common/fast_cgi_detect_transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestMaybeFastCGI(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ilen := min(len(tt.input), tt.inputLen)
res := maybeFastCGI(tt.input[0:ilen])
res := maybeFastCGI(NewLargeBufferFrom(tt.input[0:ilen]))
assert.Equal(t, tt.expected, res)
})
}
Expand Down Expand Up @@ -214,7 +214,7 @@ func TestDetectFastCGI(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
ilen := min(len(tt.input), tt.inputLen)
olen := min(len(tt.output), tt.outputLen)
method, path, status := detectFastCGI(tt.input[0:ilen], tt.output[0:olen])
method, path, status := detectFastCGI(NewLargeBufferFrom(tt.input[0:ilen]), NewLargeBufferFrom(tt.output[0:olen]))
assert.Equal(t, tt.expectedMethod, method)
assert.Equal(t, tt.expectedPath, path)
assert.Equal(t, tt.expectedResult, status)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ebpf/common/go_kafka_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func ReadGoSaramaRequestIntoSpan(record *ringbuf.Record) (request.Span, bool, er
return request.Span{}, true, err
}

info, ignore, err := ProcessKafkaRequest(event.Buf[:], nil)
info, ignore, err := ProcessKafkaRequest(NewLargeBufferFrom(event.Buf[:]).NewReader(), nil)

if err == nil && !ignore {
return GoKafkaSaramaToSpan(event, info), false, nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/ebpf/common/http2grpc_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,16 +544,16 @@ func isLikelyHTTP2(data []uint8, eventLen int) bool {
return false
}

func isHTTP2(data []uint8, eventLen int) bool {
func isHTTP2(data *LargeBuffer, eventLen int) bool {
// Parsing HTTP2 frames with the Go HTTP2/gRPC parser is very expensive.
// Therefore, we replicate some of our HTTP2 frame reader from eBPF here to
// check if this payload even remotely looks like HTTP2/gRPC, e.g. we must
// find a resonably looking HTTP "headers" frame.
if !isLikelyHTTP2(data, eventLen) {
if !isLikelyHTTP2(data.UnsafeView(), eventLen) {
return false
}

framer := byteFramer(data)
framer := http2.NewFramer(io.Discard, data.NewReader())

for {
f, err := framer.ReadFrame()
Expand Down
4 changes: 2 additions & 2 deletions pkg/ebpf/common/http2grpc_transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestHTTP2QuickDetection(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
res := isLikelyHTTP2(tt.input, tt.inputLen)
assert.Equal(t, tt.expectedQuick, res)
res1 := isHTTP2(tt.input, tt.inputLen)
res1 := isHTTP2(NewLargeBufferFrom(tt.input), tt.inputLen)
assert.Equal(t, tt.expected, res1)
})
}
Expand Down Expand Up @@ -535,7 +535,7 @@ func TestHandleHeaderField(t *testing.T) {
func BenchmarkIsHTTP2(b *testing.B) {
for i := 0; i < b.N; i++ {
for _, tt := range isHTTP2TestCases {
_ = isHTTP2(tt.input, tt.inputLen)
_ = isHTTP2(NewLargeBufferFrom(tt.input), tt.inputLen)
}
}
}
Loading