Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/ebpf/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.opentelemetry.io/obi/pkg/ebpf/common/dnsparser"
"go.opentelemetry.io/obi/pkg/internal/ebpf/kafkaparser"
"go.opentelemetry.io/obi/pkg/internal/ebpf/ringbuf"
"go.opentelemetry.io/obi/pkg/internal/largebuf"
"go.opentelemetry.io/obi/pkg/pipe/msg"
)

Expand Down Expand Up @@ -174,7 +175,7 @@ type EBPFParseContext struct {
h2c *lru.Cache[uint64, h2Connection]
redisDBCache *simplelru.LRU[BpfConnectionInfoT, int]
couchbaseBucketCache *simplelru.LRU[BpfConnectionInfoT, CouchbaseBucketInfo]
largeBuffers *expirable.LRU[largeBufferKey, *LargeBuffer]
largeBuffers *expirable.LRU[largeBufferKey, *largebuf.LargeBuffer]
mongoRequestCache PendingMongoDBRequests
mysqlPreparedStatements *simplelru.LRU[mysqlPreparedStatementsKey, string]
postgresPreparedStatements *simplelru.LRU[postgresPreparedStatementsKey, string]
Expand Down Expand Up @@ -213,7 +214,7 @@ func NewEBPFParseContext(cfg *config.EBPFTracer, spansChan *msg.Queue[[]request.
)

h2c, _ := lru.New[uint64, h2Connection](1024 * 10)
largeBuffers := expirable.NewLRU[largeBufferKey, *LargeBuffer](1024, nil, 5*time.Minute)
largeBuffers := expirable.NewLRU[largeBufferKey, *largebuf.LargeBuffer](1024, nil, 5*time.Minute)

if cfg != nil {
protocolDebug = cfg.ProtocolDebug
Expand Down
3 changes: 2 additions & 1 deletion pkg/ebpf/common/couchbase_detect_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.opentelemetry.io/obi/pkg/appolly/app"
"go.opentelemetry.io/obi/pkg/appolly/app/request"
"go.opentelemetry.io/obi/pkg/internal/ebpf/couchbasekv"
"go.opentelemetry.io/obi/pkg/internal/largebuf"
)

// CouchbaseInfo holds parsed Couchbase memcached binary protocol information.
Expand All @@ -31,7 +32,7 @@ type CouchbaseInfo struct {
// ProcessPossibleCouchbaseEvent attempts to parse the event as a Couchbase memcached binary protocol event.
// Returns a slice of CouchbaseInfo if successful, along with a boolean indicating if the event should be ignored,
// and an error if parsing failed. Multiple packets may be present in a single TCP segment due to pipelining.
func ProcessPossibleCouchbaseEvent(event *TCPRequestInfo, requestBuf *LargeBuffer, responseBuf *LargeBuffer, bucketCache *simplelru.LRU[BpfConnectionInfoT, CouchbaseBucketInfo]) (*CouchbaseInfo, bool, error) {
func ProcessPossibleCouchbaseEvent(event *TCPRequestInfo, requestBuf *largebuf.LargeBuffer, responseBuf *largebuf.LargeBuffer, bucketCache *simplelru.LRU[BpfConnectionInfoT, CouchbaseBucketInfo]) (*CouchbaseInfo, bool, error) {
reqRaw := requestBuf.UnsafeView()
respRaw := responseBuf.UnsafeView()
info, ignore, err := processCouchbaseEvent(event.ConnInfo, reqRaw, respRaw, bucketCache)
Expand Down
17 changes: 9 additions & 8 deletions pkg/ebpf/common/couchbase_detect_transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"go.opentelemetry.io/obi/pkg/appolly/app/request"
"go.opentelemetry.io/obi/pkg/internal/ebpf/couchbasekv"
"go.opentelemetry.io/obi/pkg/internal/largebuf"
)

// Helper functions to create Couchbase packets for testing
Expand Down Expand Up @@ -551,7 +552,7 @@ func TestProcessPossibleCouchbaseEventReversedBuffers(t *testing.T) {
}

// Test with buffers in correct order
info, ignore, err := ProcessPossibleCouchbaseEvent(event, NewLargeBufferFrom(requestBuf), NewLargeBufferFrom(responseBuf), cache)
info, ignore, err := ProcessPossibleCouchbaseEvent(event, largebuf.NewLargeBufferFrom(requestBuf), largebuf.NewLargeBufferFrom(responseBuf), cache)
require.NoError(t, err)
assert.False(t, ignore)
assert.Equal(t, "GET", info.Operation)
Expand All @@ -563,7 +564,7 @@ func TestProcessPossibleCouchbaseEventReversedBuffers(t *testing.T) {
ConnInfo: connInfo,
Direction: 1,
}
info, ignore, err = ProcessPossibleCouchbaseEvent(event2, NewLargeBufferFrom(garbageBuf), NewLargeBufferFrom(requestBuf), cache)
info, ignore, err = ProcessPossibleCouchbaseEvent(event2, largebuf.NewLargeBufferFrom(garbageBuf), largebuf.NewLargeBufferFrom(requestBuf), cache)
require.NoError(t, err)
assert.False(t, ignore)
require.NotNil(t, info)
Expand Down Expand Up @@ -594,7 +595,7 @@ func TestProcessPossibleCouchbaseEventConnectionIsolation(t *testing.T) {
selectBucket1Resp := makeCouchbaseResponsePacket(couchbasekv.OpcodeSelectBucket, couchbasekv.StatusSuccess, "")

event1 := &TCPRequestInfo{ConnInfo: connInfo1, Direction: 1}
_, ignore, err := ProcessPossibleCouchbaseEvent(event1, NewLargeBufferFrom(selectBucket1Req), NewLargeBufferFrom(selectBucket1Resp), cache)
_, ignore, err := ProcessPossibleCouchbaseEvent(event1, largebuf.NewLargeBufferFrom(selectBucket1Req), largebuf.NewLargeBufferFrom(selectBucket1Resp), cache)
require.NoError(t, err)
assert.True(t, ignore) // SELECT_BUCKET is ignored

Expand All @@ -603,7 +604,7 @@ func TestProcessPossibleCouchbaseEventConnectionIsolation(t *testing.T) {
selectBucket2Resp := makeCouchbaseResponsePacket(couchbasekv.OpcodeSelectBucket, couchbasekv.StatusSuccess, "")

event2 := &TCPRequestInfo{ConnInfo: connInfo2, Direction: 1}
_, ignore, err = ProcessPossibleCouchbaseEvent(event2, NewLargeBufferFrom(selectBucket2Req), NewLargeBufferFrom(selectBucket2Resp), cache)
_, ignore, err = ProcessPossibleCouchbaseEvent(event2, largebuf.NewLargeBufferFrom(selectBucket2Req), largebuf.NewLargeBufferFrom(selectBucket2Resp), cache)
require.NoError(t, err)
assert.True(t, ignore)

Expand All @@ -612,7 +613,7 @@ func TestProcessPossibleCouchbaseEventConnectionIsolation(t *testing.T) {
getCollID1Resp := makeCouchbaseResponsePacket(couchbasekv.OpcodeCollectionsGetID, couchbasekv.StatusSuccess, "")

event1 = &TCPRequestInfo{ConnInfo: connInfo1, Direction: 1}
_, ignore, err = ProcessPossibleCouchbaseEvent(event1, NewLargeBufferFrom(getCollID1Req), NewLargeBufferFrom(getCollID1Resp), cache)
_, ignore, err = ProcessPossibleCouchbaseEvent(event1, largebuf.NewLargeBufferFrom(getCollID1Req), largebuf.NewLargeBufferFrom(getCollID1Resp), cache)
require.NoError(t, err)
assert.True(t, ignore)

Expand All @@ -621,7 +622,7 @@ func TestProcessPossibleCouchbaseEventConnectionIsolation(t *testing.T) {
getCollID2Resp := makeCouchbaseResponsePacket(couchbasekv.OpcodeCollectionsGetID, couchbasekv.StatusSuccess, "")

event2 = &TCPRequestInfo{ConnInfo: connInfo2, Direction: 1}
_, ignore, err = ProcessPossibleCouchbaseEvent(event2, NewLargeBufferFrom(getCollID2Req), NewLargeBufferFrom(getCollID2Resp), cache)
_, ignore, err = ProcessPossibleCouchbaseEvent(event2, largebuf.NewLargeBufferFrom(getCollID2Req), largebuf.NewLargeBufferFrom(getCollID2Resp), cache)
require.NoError(t, err)
assert.True(t, ignore)

Expand All @@ -631,7 +632,7 @@ func TestProcessPossibleCouchbaseEventConnectionIsolation(t *testing.T) {

// GET on connection 1 should have bucket1/scope1/coll1
event1 = &TCPRequestInfo{ConnInfo: connInfo1, Direction: 1}
info1, ignore, err := ProcessPossibleCouchbaseEvent(event1, NewLargeBufferFrom(getReq), NewLargeBufferFrom(getResp), cache)
info1, ignore, err := ProcessPossibleCouchbaseEvent(event1, largebuf.NewLargeBufferFrom(getReq), largebuf.NewLargeBufferFrom(getResp), cache)
require.NoError(t, err)
assert.False(t, ignore)
require.NotNil(t, info1)
Expand All @@ -641,7 +642,7 @@ func TestProcessPossibleCouchbaseEventConnectionIsolation(t *testing.T) {

// GET on connection 2 should have bucket2/scope2/coll2
event2 = &TCPRequestInfo{ConnInfo: connInfo2, Direction: 1}
info2, ignore, err := ProcessPossibleCouchbaseEvent(event2, NewLargeBufferFrom(getReq), NewLargeBufferFrom(getResp), cache)
info2, ignore, err := ProcessPossibleCouchbaseEvent(event2, largebuf.NewLargeBufferFrom(getReq), largebuf.NewLargeBufferFrom(getResp), cache)
require.NoError(t, err)
assert.False(t, ignore)
require.NotNil(t, info2)
Expand Down
7 changes: 4 additions & 3 deletions pkg/ebpf/common/fast_cgi_detect_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"go.opentelemetry.io/obi/pkg/appolly/app"
"go.opentelemetry.io/obi/pkg/appolly/app/request"
"go.opentelemetry.io/obi/pkg/internal/largebuf"
)

const (
Expand Down Expand Up @@ -108,14 +109,14 @@ func parseCGITable(b []byte) map[string]string {
return res
}

func maybeFastCGI(b *LargeBuffer) bool {
func maybeFastCGI(b *largebuf.LargeBuffer) bool {
if b.Len() <= fastCGIRequestHeaderLen {
return false
}
return bytes.Contains(b.UnsafeView(), []byte(requestMethodKey))
}

func parseHeader(b *LargeBuffer) ([]byte, error) {
func parseHeader(b *largebuf.LargeBuffer) ([]byte, error) {
r := b.NewReader()
for {
if r.Remaining() < fastCGIRequestHeaderLen {
Expand Down Expand Up @@ -144,7 +145,7 @@ func parseHeader(b *LargeBuffer) ([]byte, error) {
}
}

func detectFastCGI(b, rb *LargeBuffer) (string, string, int) {
func detectFastCGI(b, rb *largebuf.LargeBuffer) (string, string, int) {
raw, err := parseHeader(b)
if err != nil {
return "", "", -1
Expand Down
6 changes: 4 additions & 2 deletions pkg/ebpf/common/fast_cgi_detect_transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/obi/pkg/internal/largebuf"
)

func TestMaybeFastCGI(t *testing.T) {
Expand Down Expand Up @@ -45,7 +47,7 @@ func TestMaybeFastCGI(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ilen := min(len(tt.input), tt.inputLen)
res := maybeFastCGI(NewLargeBufferFrom(tt.input[0:ilen]))
res := maybeFastCGI(largebuf.NewLargeBufferFrom(tt.input[0:ilen]))
assert.Equal(t, tt.expected, res)
})
}
Expand Down Expand Up @@ -214,7 +216,7 @@ func TestDetectFastCGI(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
ilen := min(len(tt.input), tt.inputLen)
olen := min(len(tt.output), tt.outputLen)
method, path, status := detectFastCGI(NewLargeBufferFrom(tt.input[0:ilen]), NewLargeBufferFrom(tt.output[0:olen]))
method, path, status := detectFastCGI(largebuf.NewLargeBufferFrom(tt.input[0:ilen]), largebuf.NewLargeBufferFrom(tt.output[0:olen]))
assert.Equal(t, tt.expectedMethod, method)
assert.Equal(t, tt.expectedPath, path)
assert.Equal(t, tt.expectedResult, status)
Expand Down
3 changes: 2 additions & 1 deletion pkg/ebpf/common/go_kafka_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.opentelemetry.io/obi/pkg/appolly/app"
"go.opentelemetry.io/obi/pkg/appolly/app/request"
"go.opentelemetry.io/obi/pkg/internal/ebpf/ringbuf"
"go.opentelemetry.io/obi/pkg/internal/largebuf"
)

func ReadGoSaramaRequestIntoSpan(record *ringbuf.Record) (request.Span, bool, error) {
Expand All @@ -19,7 +20,7 @@ func ReadGoSaramaRequestIntoSpan(record *ringbuf.Record) (request.Span, bool, er
return request.Span{}, true, err
}

info, ignore, err := ProcessKafkaRequest(NewLargeBufferFrom(event.Buf[:]).NewReader(), nil)
info, ignore, err := ProcessKafkaRequest(largebuf.NewLargeBufferFrom(event.Buf[:]), nil)

if err == nil && !ignore {
return GoKafkaSaramaToSpan(event, info), false, nil
Expand Down
6 changes: 4 additions & 2 deletions pkg/ebpf/common/http2grpc_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.opentelemetry.io/obi/pkg/appolly/app/request"
"go.opentelemetry.io/obi/pkg/internal/ebpf/bhpack"
"go.opentelemetry.io/obi/pkg/internal/ebpf/ringbuf"
"go.opentelemetry.io/obi/pkg/internal/largebuf"
)

type BPFHTTP2Info BpfHttp2GrpcRequestT
Expand Down Expand Up @@ -544,7 +545,7 @@ func isLikelyHTTP2(data []uint8, eventLen int) bool {
return false
}

func isHTTP2(data *LargeBuffer, eventLen int) bool {
func isHTTP2(data *largebuf.LargeBuffer, eventLen int) bool {
// Parsing HTTP2 frames with the Go HTTP2/gRPC parser is very expensive.
// Therefore, we replicate some of our HTTP2 frame reader from eBPF here to
// check if this payload even remotely looks like HTTP2/gRPC, e.g. we must
Expand All @@ -553,7 +554,8 @@ func isHTTP2(data *LargeBuffer, eventLen int) bool {
return false
}

framer := http2.NewFramer(io.Discard, data.NewReader())
dataReader := data.NewReader()
framer := http2.NewFramer(io.Discard, &dataReader)

for {
f, err := framer.ReadFrame()
Expand Down
5 changes: 3 additions & 2 deletions pkg/ebpf/common/http2grpc_transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"golang.org/x/net/http2"

"go.opentelemetry.io/obi/pkg/internal/ebpf/bhpack"
"go.opentelemetry.io/obi/pkg/internal/largebuf"
)

var isHTTP2TestCases = []struct {
Expand Down Expand Up @@ -117,7 +118,7 @@ func TestHTTP2QuickDetection(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
res := isLikelyHTTP2(tt.input, tt.inputLen)
assert.Equal(t, tt.expectedQuick, res)
res1 := isHTTP2(NewLargeBufferFrom(tt.input), tt.inputLen)
res1 := isHTTP2(largebuf.NewLargeBufferFrom(tt.input), tt.inputLen)
assert.Equal(t, tt.expected, res1)
})
}
Expand Down Expand Up @@ -535,7 +536,7 @@ func TestHandleHeaderField(t *testing.T) {
func BenchmarkIsHTTP2(b *testing.B) {
for i := 0; i < b.N; i++ {
for _, tt := range isHTTP2TestCases {
_ = isHTTP2(NewLargeBufferFrom(tt.input), tt.inputLen)
_ = isHTTP2(largebuf.NewLargeBufferFrom(tt.input), tt.inputLen)
}
}
}
22 changes: 13 additions & 9 deletions pkg/ebpf/common/http_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.opentelemetry.io/obi/pkg/appolly/app/request"
ebpfhttp "go.opentelemetry.io/obi/pkg/ebpf/common/http"
"go.opentelemetry.io/obi/pkg/internal/ebpf/ringbuf"
"go.opentelemetry.io/obi/pkg/internal/largebuf"
)

func removeQuery(url string) string {
Expand Down Expand Up @@ -191,7 +192,7 @@ func ReadHTTPInfoIntoSpan(parseCtx *EBPFParseContext, record *ringbuf.Record, fi

func HTTPInfoEventToSpan(parseCtx *EBPFParseContext, event *BPFHTTPInfo) (request.Span, bool, error) {
var (
requestBuffer, responseBuffer *LargeBuffer
requestBuffer, responseBuffer *largebuf.LargeBuffer
hasResponse bool
isClient = isClientEvent(event.Type)
)
Expand All @@ -204,7 +205,7 @@ func HTTPInfoEventToSpan(parseCtx *EBPFParseContext, event *BPFHTTPInfo) (reques
requestBuffer = b
} else {
slog.Debug("missing large buffer for HTTP request", "traceID", event.Tp.TraceId, "conn", event.ConnInfo, "packetType", packetTypeRequest)
requestBuffer = NewLargeBufferFrom(event.Buf[:])
requestBuffer = largebuf.NewLargeBufferFrom(event.Buf[:])
}

b, ok = extractTCPLargeBuffer(parseCtx, event.Tp.TraceId, packetTypeResponse, directionByPacketType(packetTypeResponse, isClient), event.ConnInfo)
Expand All @@ -215,7 +216,7 @@ func HTTPInfoEventToSpan(parseCtx *EBPFParseContext, event *BPFHTTPInfo) (reques
slog.Debug("missing large buffer for HTTP response", "traceID", event.Tp.TraceId, "conn", event.ConnInfo, "packetType", packetTypeResponse)
}
} else {
requestBuffer = NewLargeBufferFrom(event.Buf[:])
requestBuffer = largebuf.NewLargeBufferFrom(event.Buf[:])
}

if parseCtx != nil && !parseCtx.payloadExtraction.Enabled() {
Expand All @@ -230,7 +231,8 @@ func HTTPInfoEventToSpan(parseCtx *EBPFParseContext, event *BPFHTTPInfo) (reques
}

// http.ReadRequest requires a *bufio.Reader; that one allocation is unavoidable.
req, err := http.ReadRequest(bufio.NewReader(requestBuffer.NewReader()))
reqReader := requestBuffer.NewReader()
req, err := http.ReadRequest(bufio.NewReader(&reqReader))
resp, err2 := httpSafeParseResponse(responseBuffer, req)
if err != nil || err2 != nil {
slog.Debug("error while parsing http request or response, falling back to manual HTTP info parsing", "reqErr", err, "respErr", err2)
Expand All @@ -243,19 +245,21 @@ func HTTPInfoEventToSpan(parseCtx *EBPFParseContext, event *BPFHTTPInfo) (reques
// HTTP response buffers might have been sent incomplete, before the full body.
// Try to parse the original buffer first, if an EOF is encountered, append an empty
// body to the buffer and try again.
func httpSafeParseResponse(responseBuffer *LargeBuffer, req *http.Request) (*http.Response, error) {
rd := bufio.NewReader(responseBuffer.NewReader())
func httpSafeParseResponse(responseBuffer *largebuf.LargeBuffer, req *http.Request) (*http.Response, error) {
r := responseBuffer.NewReader()
rd := bufio.NewReader(&r)
resp, err := http.ReadResponse(rd, req)
if err != nil && errors.Is(err, io.ErrUnexpectedEOF) {
// Append empty body terminator and retry with a fresh reader.
// Append empty body terminator and retry, reusing the same reader (preserves scratch).
responseBuffer.AppendChunk([]byte("\r\n\r\n"))
rd.Reset(responseBuffer.NewReader())
r.Reset()
rd.Reset(&r)
return http.ReadResponse(rd, req)
}
return resp, nil
}

func httpRequestToSpan(event *BPFHTTPInfo, requestBuffer *LargeBuffer) request.Span {
func httpRequestToSpan(event *BPFHTTPInfo, requestBuffer *largebuf.LargeBuffer) request.Span {
var (
result = HTTPInfo{BPFHTTPInfo: *event}
bufHost string
Expand Down
Loading