Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 9 additions & 25 deletions interpreter/apmint/apmint.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
package apmint // import "go.opentelemetry.io/ebpf-profiler/interpreter/apmint"

import (
"encoding/hex"
"errors"
"fmt"
"hash/fnv"
"regexp"
"strconv"
"unsafe"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -145,46 +144,31 @@ type Instance struct {

var _ interpreter.Instance = &Instance{}

// hashTrace calculates the hash of a trace and returns it.
// Be aware that changes to this calculation will break the ability to
// look backwards for the same TraceHash in our backend.
func hashTrace(trace *libpf.Trace) (hash [16]byte) {
var buf [24]byte
h := fnv.New128a()
for _, uniqueFrame := range trace.Frames {
frame := uniqueFrame.Value()
_, _ = h.Write(frame.FileID.Bytes())
// Using FormatUint() or putting AppendUint() into a function leads
// to escaping to heap (allocation).
_, _ = h.Write(strconv.AppendUint(buf[:0], uint64(frame.AddressOrLineno), 10))
}
h.Sum(hash[:0])
return
}

// Detach implements the interpreter.Instance interface.
func (i *Instance) Detach(ebpf interpreter.EbpfHandler, pid libpf.PID) error {
return ebpf.DeleteProcData(libpf.APMInt, pid)
}

// NotifyAPMAgent sends out collected traces to the connected APM agent.
func (i *Instance) NotifyAPMAgent(pid libpf.PID, rawTrace *host.Trace, umTrace *libpf.Trace) {
func (i *Instance) NotifyAPMAgent(
pid libpf.PID, rawTrace *host.Trace, umTraceHash libpf.TraceHash, count uint16) {
if rawTrace.APMTransactionID == libpf.InvalidAPMSpanID || i.socket == nil {
return
}

log.Debugf("Reporting %dx trace hash %s -> TX %s for PID %d",
count, umTraceHash.StringNoQuotes(),
hex.EncodeToString(rawTrace.APMTransactionID[:]), pid)

msg := traceCorrMsg{
MessageType: 1,
MinorVersion: 1,
APMTraceID: rawTrace.APMTraceID,
APMTransactionID: rawTrace.APMTransactionID,
StackTraceID: hashTrace(umTrace),
Count: 1,
StackTraceID: umTraceHash,
Count: count,
}

log.Debugf("Reporting a trace hash %x -> TX %x for PID %d",
msg.StackTraceID[:], rawTrace.APMTransactionID[:], pid)

if err := i.socket.SendMessage(msg.Serialize()); err != nil {
log.Debugf("Failed to send trace mappings to APM agent: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions interpreter/apmint/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ type traceCorrMsg struct {
MinorVersion uint16
APMTraceID libpf.APMTraceID
APMTransactionID libpf.APMTransactionID
StackTraceID [16]byte
StackTraceID libpf.TraceHash
Count uint16
}

Expand All @@ -113,7 +113,7 @@ func (m *traceCorrMsg) Serialize() []byte {
_ = binary.Write(&buf, binary.LittleEndian, m.MinorVersion)
_, _ = buf.Write(m.APMTraceID[:])
_, _ = buf.Write(m.APMTransactionID[:])
_, _ = buf.Write(m.StackTraceID[:])
_, _ = buf.Write(m.StackTraceID.Bytes())
_ = binary.Write(&buf, binary.LittleEndian, m.Count)
return buf.Bytes()
}
Expand Down
1 change: 1 addition & 0 deletions libpf/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (frames *Frames) Append(frame *Frame) {
// Trace represents a stack trace.
type Trace struct {
Frames Frames
Hash TraceHash
CustomLabels map[string]string
}

Expand Down
60 changes: 60 additions & 0 deletions libpf/tracehash.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package libpf // import "go.opentelemetry.io/ebpf-profiler/libpf"

import (
"encoding"
"encoding/base64"

"go.opentelemetry.io/ebpf-profiler/libpf/basehash"
)

// TraceHash represents the unique hash of a trace
type TraceHash struct {
basehash.Hash128
}

func NewTraceHash(hi, lo uint64) TraceHash {
return TraceHash{basehash.New128(hi, lo)}
}

// TraceHashFromBytes parses a byte slice of a trace hash into the internal data representation.
func TraceHashFromBytes(b []byte) (TraceHash, error) {
h, err := basehash.New128FromBytes(b)
if err != nil {
return TraceHash{}, err
}
return TraceHash{h}, nil
}

func (h TraceHash) Equal(other TraceHash) bool {
return h.Hash128.Equal(other.Hash128)
}

func (h TraceHash) Less(other TraceHash) bool {
return h.Hash128.Less(other.Hash128)
}

// EncodeTo encodes the hash into the base64 encoded representation
// and stores it in the provided destination byte array.
// The length of the destination must be at least EncodedLen().
func (h TraceHash) EncodeTo(dst []byte) {
base64.RawURLEncoding.Encode(dst, h.Bytes())
}

// EncodedLen returns the length of the hash's base64 representation.
func (TraceHash) EncodedLen() int {
// TraceHash is 16 bytes long, the base64 representation is one base64 byte per 6 bits.
return ((16)*8)/6 + 1
}

// Hash32 returns a 32 bits hash of the input.
// It's main purpose is to be used for LRU caching.
func (h TraceHash) Hash32() uint32 {
return uint32(h.Lo())
}

// Compile-time interface checks
var _ encoding.TextUnmarshaler = (*TraceHash)(nil)
var _ encoding.TextMarshaler = (*TraceHash)(nil)
83 changes: 83 additions & 0 deletions libpf/tracehash_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package libpf

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestTraceHashSprintf(t *testing.T) {
origHash := NewTraceHash(0x0001C03F8D6B8520, 0xEDEAEEA9460BEEBB)

marshaled := fmt.Sprintf("%d", origHash)
//nolint:goconst
expected := "{492854164817184 17143777342331285179}"
assert.Equal(t, expected, marshaled)

marshaled = fmt.Sprintf("%s", origHash)
expected = "{%!s(uint64=492854164817184) %!s(uint64=17143777342331285179)}"
assert.Equal(t, expected, marshaled)

marshaled = fmt.Sprintf("%v", origHash)
//nolint:goconst
expected = "{492854164817184 17143777342331285179}"
assert.Equal(t, expected, marshaled)

marshaled = fmt.Sprintf("%#v", origHash)
expected = "0x1c03f8d6b8520edeaeea9460beebb"
assert.Equal(t, expected, marshaled)

// Values were chosen to test non-zero-padded output
traceHash := NewTraceHash(42, 100)

marshaled = fmt.Sprintf("%x", traceHash)
expected = "2a0000000000000064"
assert.Equal(t, expected, marshaled)

marshaled = fmt.Sprintf("%X", traceHash)
expected = "2A0000000000000064"
assert.Equal(t, expected, marshaled)

marshaled = fmt.Sprintf("%#x", traceHash)
expected = "0x2a0000000000000064"
assert.Equal(t, expected, marshaled)

marshaled = fmt.Sprintf("%#X", traceHash)
expected = "0x2A0000000000000064"
assert.Equal(t, expected, marshaled)
}

func TestTraceHashMarshal(t *testing.T) {
origHash := NewTraceHash(0x600DF00D, 0xF00D600D)

// Test (Un)MarshalJSON
data, err := origHash.MarshalJSON()
require.NoError(t, err)

marshaled := string(data)
expected := "\"00000000600df00d00000000f00d600d\""
assert.Equal(t, expected, marshaled)

var jsonHash TraceHash
err = jsonHash.UnmarshalJSON(data)
require.NoError(t, err)
assert.Equal(t, origHash, jsonHash)

// Test (Un)MarshalText
data, err = origHash.MarshalText()
require.NoError(t, err)

marshaled = string(data)
expected = "00000000600df00d00000000f00d600d"
assert.Equal(t, expected, marshaled)

var textHash TraceHash
err = textHash.UnmarshalText(data)
require.NoError(t, err)
assert.Equal(t, origHash, textHash)
}
7 changes: 5 additions & 2 deletions processmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"go.opentelemetry.io/ebpf-profiler/reporter"
"go.opentelemetry.io/ebpf-profiler/times"
"go.opentelemetry.io/ebpf-profiler/tracer/types"
"go.opentelemetry.io/ebpf-profiler/traceutil"
"go.opentelemetry.io/ebpf-profiler/util"
)

Expand Down Expand Up @@ -295,10 +296,12 @@ func (pm *ProcessManager) ConvertTrace(trace *host.Trace) (newTrace *libpf.Trace
}
}
}
newTrace.Hash = traceutil.HashTrace(newTrace)
return newTrace
}

func (pm *ProcessManager) MaybeNotifyAPMAgent(rawTrace *host.Trace, umTrace *libpf.Trace) string {
func (pm *ProcessManager) MaybeNotifyAPMAgent(
rawTrace *host.Trace, umTraceHash libpf.TraceHash, count uint16) string {
pm.mu.RLock()
pidInterp, ok := pm.interpreters[rawTrace.PID]
pm.mu.RUnlock()
Expand All @@ -309,7 +312,7 @@ func (pm *ProcessManager) MaybeNotifyAPMAgent(rawTrace *host.Trace, umTrace *lib
var serviceName string
for _, mapping := range pidInterp {
if apm, ok := mapping.(*apmint.Instance); ok {
apm.NotifyAPMAgent(rawTrace.PID, rawTrace, umTrace)
apm.NotifyAPMAgent(rawTrace.PID, rawTrace, umTraceHash, count)

if serviceName != "" {
log.Warnf("Overwriting APM service name from '%s' to '%s' for PID %d",
Expand Down
82 changes: 82 additions & 0 deletions processmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.opentelemetry.io/ebpf-profiler/reporter"
"go.opentelemetry.io/ebpf-profiler/support"
tracertypes "go.opentelemetry.io/ebpf-profiler/tracer/types"
"go.opentelemetry.io/ebpf-profiler/traceutil"
"go.opentelemetry.io/ebpf-profiler/util"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -262,6 +263,87 @@ func (s *symbolReporterMockup) ExecutableMetadata(_ *reporter.ExecutableMetadata

var _ reporter.SymbolReporter = (*symbolReporterMockup)(nil)

func TestInterpreterConvertTrace(t *testing.T) {
partialNativeFrameFileID := uint64(0xabcdbeef)
nativeFrameLineno := libpf.AddressOrLineno(0x1234)

pythonAndNativeTrace := &host.Trace{
Frames: []host.Frame{{
// This represents a native frame
File: host.FileID(partialNativeFrameFileID),
Lineno: nativeFrameLineno,
Type: libpf.NativeFrame,
}, {
File: host.FileID(42),
Lineno: libpf.AddressOrLineno(0x13e1bb8e), // same as runForeverTrace
Type: libpf.PythonFrame,
}},
}

tests := map[string]struct {
trace *host.Trace
expect *libpf.Trace
}{
"Convert Trace": {
trace: pythonAndNativeTrace,
expect: getExpectedTrace(pythonAndNativeTrace,
[]libpf.AddressOrLineno{0, 1}),
},
}

for name, testcase := range tests {
t.Run(name, func(t *testing.T) {
mapper := NewMapFileIDMapper()
for i, f := range testcase.trace.Frames {
mapper.Set(f.File, testcase.expect.Frames[i].Value().FileID)
}

// For this test do not include interpreters.
noIinterpreters, _ := tracertypes.Parse("")

ctx, cancel := context.WithCancel(t.Context())
defer cancel()

// To test ConvertTrace we do not require all parts of processmanager.
manager, err := New(ctx,
noIinterpreters,
1*time.Second,
nil,
nil,
&symbolReporterMockup{},
nil,
true,
libpf.Set[string]{})
require.NoError(t, err)

newTrace := manager.ConvertTrace(testcase.trace)

testcase.expect.Hash = traceutil.HashTrace(testcase.expect)
if testcase.expect.Hash == newTrace.Hash {
assert.Equal(t, testcase.expect, newTrace)
}
})
}
}

// getExpectedTrace returns a new libpf trace that is based on the provided host trace, but
// with the linenos replaced by the provided values. This function is for generating an expected
// trace for tests below.
func getExpectedTrace(origTrace *host.Trace, linenos []libpf.AddressOrLineno) *libpf.Trace {
newTrace := &libpf.Trace{
Hash: libpf.NewTraceHash(uint64(origTrace.Hash), uint64(origTrace.Hash)),
}

for i, frame := range origTrace.Frames {
lineno := frame.Lineno
if linenos != nil {
lineno = linenos[i]
}
newTrace.AppendFrame(frame.Type, libpf.NewFileID(uint64(frame.File), 0), lineno)
}
return newTrace
}

func TestNewMapping(t *testing.T) {
tests := map[string]struct {
// newMapping holds the arguments that are passed to NewMapping() in the test.
Expand Down
1 change: 1 addition & 0 deletions reporter/base_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (b *baseReporter) ReportTraceEvent(trace *libpf.Trace, meta *samples.TraceE

containerID := meta.ContainerID
key := samples.TraceAndMetaKey{
Hash: trace.Hash,
Comm: meta.Comm,
ProcessName: meta.ProcessName,
ExecutablePath: meta.ExecutablePath,
Expand Down
Loading
Loading