Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 159 additions & 0 deletions interpreter/golabels/integrationtests/golabels_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
//go:build integration

// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package integrationtests

import (
"context"
"math"
"math/rand"
"os"
"runtime/debug"
"runtime/pprof"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/ebpf-profiler/host"
"go.opentelemetry.io/ebpf-profiler/libpf"
"go.opentelemetry.io/ebpf-profiler/reporter"
"go.opentelemetry.io/ebpf-profiler/tracer"
tracertypes "go.opentelemetry.io/ebpf-profiler/tracer/types"
)

type mockIntervals struct{}

func (mockIntervals) MonitorInterval() time.Duration { return 1 * time.Second }
func (mockIntervals) TracePollInterval() time.Duration { return 250 * time.Millisecond }
func (mockIntervals) PIDCleanupInterval() time.Duration { return 1 * time.Second }

type mockReporter struct{}

func (mockReporter) ExecutableKnown(_ libpf.FileID) bool { return true }
func (mockReporter) ExecutableMetadata(_ *reporter.ExecutableMetadataArgs) {}

func isRoot() bool {
return os.Geteuid() == 0
}

//nolint:gosec
func randomString(n int) string {
letters := []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
s := make([]rune, n)
for i := range s {
s[i] = letters[rand.Intn(len(letters))]
}
return string(s)
}

func setPprofLabels(t *testing.T, ctx context.Context, cookie string, busyFunc func()) {
t.Helper()
labels := pprof.Labels(
"l1"+cookie, "label1"+randomString(16),
"l2"+cookie, "label2"+randomString(24),
"l3"+cookie, "label3"+randomString(48))
lastUpdate := time.Now()
pprof.Do(context.TODO(), labels, func(context.Context) {
for time.Since(lastUpdate) < 10*time.Second {
// CPU go burr on purpose.
busyFunc()
if ctx.Err() != nil {
return
}
}
})
}

func Test_Golabels(t *testing.T) {
if !isRoot() {
t.Skip("root privileges required")
}

buildInfo, ok := debug.ReadBuildInfo()
if !ok {
t.Fatalf("Failed to get build info")
}

withCGO := false
for _, setting := range buildInfo.Settings {
if setting.Key == "CGO_ENABLED" {
withCGO = true
}
}
t.Logf("CGo is enabled: %t", withCGO)

cookie := buildInfo.GoVersion

t.Run(cookie, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

enabledTracers, _ := tracertypes.Parse("")
enabledTracers.Enable(tracertypes.Labels)
enabledTracers.Enable(tracertypes.GoTracer)

trc, err := tracer.NewTracer(ctx, &tracer.Config{
Reporter: &mockReporter{},

Check failure on line 99 in interpreter/golabels/integrationtests/golabels_test.go

View workflow job for this annotation

GitHub Actions / Lint (amd64)

cannot use &mockReporter{} (value of type *mockReporter) as reporter.SymbolReporter value in struct literal: *mockReporter does not implement reporter.SymbolReporter (missing method FrameKnown)

Check failure on line 99 in interpreter/golabels/integrationtests/golabels_test.go

View workflow job for this annotation

GitHub Actions / Lint (arm64)

cannot use &mockReporter{} (value of type *mockReporter) as reporter.SymbolReporter value in struct literal: *mockReporter does not implement reporter.SymbolReporter (missing method FrameKnown)
Intervals: &mockIntervals{},
IncludeTracers: enabledTracers,
SamplesPerSecond: 20,
ProbabilisticInterval: 100,
ProbabilisticThreshold: 100,
OffCPUThreshold: uint32(math.MaxUint32 / 100),
VerboseMode: true,

Check failure on line 106 in interpreter/golabels/integrationtests/golabels_test.go

View workflow job for this annotation

GitHub Actions / Lint (amd64)

unknown field VerboseMode in struct literal of type tracer.Config

Check failure on line 106 in interpreter/golabels/integrationtests/golabels_test.go

View workflow job for this annotation

GitHub Actions / Lint (arm64)

unknown field VerboseMode in struct literal of type tracer.Config
})
require.NoError(t, err)

trc.StartPIDEventProcessor(ctx)

err = trc.AttachTracer()
require.NoError(t, err)

t.Log("Attached tracer program")

err = trc.EnableProfiling()
require.NoError(t, err)

err = trc.AttachSchedMonitor()
require.NoError(t, err)

traceCh := make(chan *host.Trace)

err = trc.StartMapMonitors(ctx, traceCh)
require.NoError(t, err)

go setPprofLabels(t, ctx, cookie, busyFunc)

Check failure on line 128 in interpreter/golabels/integrationtests/golabels_test.go

View workflow job for this annotation

GitHub Actions / Lint (amd64)

undefined: busyFunc (typecheck)

Check failure on line 128 in interpreter/golabels/integrationtests/golabels_test.go

View workflow job for this annotation

GitHub Actions / Lint (arm64)

undefined: busyFunc (typecheck)

for trace := range traceCh {
if trace == nil {
continue
}
if len(trace.CustomLabels) > 0 {
hits := 0
for k, v := range trace.CustomLabels {
switch k {
case "l1" + cookie:
require.Len(t, v, 22)
require.True(t, strings.HasPrefix(v, "label1"))
hits |= (1 << 0)
case "l2" + cookie:
require.Len(t, v, 30)
require.True(t, strings.HasPrefix(v, "label2"))
hits |= (1 << 1)
case "l3" + cookie:
require.Len(t, v, 47)
require.True(t, strings.HasPrefix(v, "label3"))
hits |= (1 << 2)
}
}
if hits == (1<<0 | 1<<1 | 1<<2) {
cancel()
break
}
}
}
})
}
28 changes: 9 additions & 19 deletions interpreter/instancestubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,24 @@ import (
"go.opentelemetry.io/ebpf-profiler/tpbase"
)

// ObserverStubs provides empty implementations of Observer hooks that are
// not mandatory for an Observer implementation.
type ObserverStubs struct {
}

func (os *ObserverStubs) Detach(EbpfHandler, libpf.PID) error {
return nil
// InstanceStubs provides empty implementations of Instance hooks that are
// not mandatory for a Instance implementation.
type InstanceStubs struct {
}

func (os *ObserverStubs) SynchronizeMappings(EbpfHandler, reporter.SymbolReporter, process.Process,
func (is *InstanceStubs) SynchronizeMappings(EbpfHandler, reporter.SymbolReporter, process.Process,
[]process.Mapping) error {
return nil
}

func (os *ObserverStubs) UpdateTSDInfo(EbpfHandler, libpf.PID, tpbase.TSDInfo) error {
func (is *InstanceStubs) UpdateTSDInfo(EbpfHandler, libpf.PID, tpbase.TSDInfo) error {
return nil
}

func (os *ObserverStubs) GetAndResetMetrics() ([]metrics.Metric, error) {
return []metrics.Metric{}, nil
}

// InstanceStubs provides empty implementations of Instance hooks that are
// not mandatory for a Instance implementation.
type InstanceStubs struct {
ObserverStubs
}

func (is *InstanceStubs) Symbolize(reporter.SymbolReporter, *host.Frame, *libpf.Trace) error {
return ErrMismatchInterpreterType
}

func (is *InstanceStubs) GetAndResetMetrics() ([]metrics.Metric, error) {
return []metrics.Metric{}, nil
}
144 changes: 144 additions & 0 deletions interpreter/multi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package interpreter // import "go.opentelemetry.io/ebpf-profiler/interpreter"

import (
"errors"

log "github.com/sirupsen/logrus"
"go.opentelemetry.io/ebpf-profiler/host"
"go.opentelemetry.io/ebpf-profiler/libpf"
"go.opentelemetry.io/ebpf-profiler/metrics"
"go.opentelemetry.io/ebpf-profiler/process"
"go.opentelemetry.io/ebpf-profiler/remotememory"
"go.opentelemetry.io/ebpf-profiler/reporter"
"go.opentelemetry.io/ebpf-profiler/tpbase"
)

// MultiData implements the Data interface for multiple interpreters.
type MultiData struct {
interpreters []Data
}

// NewMultiData creates a new MultiData instance from multiple Data instances.
func NewMultiData(interpreters []Data) *MultiData {
return &MultiData{
interpreters: interpreters,
}
}

// Attach attaches all interpreters and returns a MultiInstance.
func (m *MultiData) Attach(ebpf EbpfHandler, pid libpf.PID, bias libpf.Address,
rm remotememory.RemoteMemory) (Instance, error) {
var instances []Instance
var errs []error

for _, data := range m.interpreters {
instance, err := data.Attach(ebpf, pid, bias, rm)
if err != nil {
errs = append(errs, err)
continue
}
if instance != nil {
instances = append(instances, instance)
}
}

err := errors.Join(errs...)
if len(instances) == 0 {
// Either all interpreters returned nil instances without error (e.g., not ready yet)
// in which case return nil, nil (valid state) otherwise return combined error.
return nil, err
}

// We got at least one valid instance, log any errors that occurred
if err != nil {
log.Errorf("Errors occurred while attaching interpreters: %v", err)
}

return NewMultiInstance(instances), nil
}

// Unload unloads all interpreters.
func (m *MultiData) Unload(ebpf EbpfHandler) {
for _, data := range m.interpreters {
data.Unload(ebpf)
}
}

// MultiInstance implements the Instance interface for multiple interpreters.
type MultiInstance struct {
instances []Instance
}

// NewMultiInstance creates a new MultiInstance from multiple Instance instances.
func NewMultiInstance(instances []Instance) *MultiInstance {
return &MultiInstance{
instances: instances,
}
}

// Detach detaches all interpreter instances.
func (m *MultiInstance) Detach(ebpf EbpfHandler, pid libpf.PID) error {
var errs []error
for _, instance := range m.instances {
if err := instance.Detach(ebpf, pid); err != nil {
errs = append(errs, err)
}
}
return errors.Join(errs...)
}

// SynchronizeMappings synchronizes mappings for all interpreter instances.
func (m *MultiInstance) SynchronizeMappings(ebpf EbpfHandler,
symbolReporter reporter.SymbolReporter, pr process.Process, mappings []process.Mapping) error {
var errs []error
for _, instance := range m.instances {
if err := instance.SynchronizeMappings(ebpf, symbolReporter, pr, mappings); err != nil {
errs = append(errs, err)
}
}
return errors.Join(errs...)
}

// UpdateTSDInfo updates TSD info for all interpreter instances.
func (m *MultiInstance) UpdateTSDInfo(ebpf EbpfHandler, pid libpf.PID, info tpbase.TSDInfo) error {
var errs []error
for _, instance := range m.instances {
if err := instance.UpdateTSDInfo(ebpf, pid, info); err != nil {
errs = append(errs, err)
}
}
return errors.Join(errs...)
}

// Symbolize tries to symbolize the frame with each interpreter instance until one succeeds.
func (m *MultiInstance) Symbolize(symbolReporter reporter.SymbolReporter, frame *host.Frame,
trace *libpf.Trace) error {
// Try each interpreter in order
for _, instance := range m.instances {
err := instance.Symbolize(symbolReporter, frame, trace)
if err != ErrMismatchInterpreterType {
return err
}
}
return ErrMismatchInterpreterType
}

// GetAndResetMetrics collects metrics from all interpreter instances.
func (m *MultiInstance) GetAndResetMetrics() ([]metrics.Metric, error) {
var allMetrics []metrics.Metric
var errs []error

for _, instance := range m.instances {
metrics, err := instance.GetAndResetMetrics()
if err != nil {
errs = append(errs, err)
continue
}
allMetrics = append(allMetrics, metrics...)
}

return allMetrics, errors.Join(errs...)
}
24 changes: 8 additions & 16 deletions interpreter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,14 @@ type Data interface {
Unload(ebpf EbpfHandler)
}

// Observer is the base interface for observing per-PID data without symbolization.
// This interface is useful for components that need to observe processes without
// providing frame symbolization capabilities.
type Observer interface {
// Instance is the interface to operate on per-PID data.
type Instance interface {
// Detach removes any information from the ebpf maps. The pid is given as argument so
// simple observers can use the global Data also as the Observer implementation.
// simple interpreters can use the global Data also as the Instance implementation.
Detach(ebpf EbpfHandler, pid libpf.PID) error

// SynchronizeMappings is called when the processmanager has reread process memory
// mappings. Observers not needing to process these events can simply ignore them
// mappings. Interpreters not needing to process these events can simply ignore them
// by just returning a nil.
SynchronizeMappings(ebpf EbpfHandler, symbolReporter reporter.SymbolReporter,
pr process.Process, mappings []process.Mapping) error
Expand All @@ -148,19 +146,13 @@ type Observer interface {
// introspection data has been updated.
UpdateTSDInfo(ebpf EbpfHandler, pid libpf.PID, info tpbase.TSDInfo) error

// GetAndResetMetrics collects the metrics from the Observer and resets
// the counters to their initial value.
GetAndResetMetrics() ([]metrics.Metric, error)
}

// Instance is the interface to operate on per-PID data with symbolization support.
// It extends Observer with the ability to symbolize frames.
type Instance interface {
Observer

// Symbolize requests symbolization of the given frame, and dispatches this symbolization
// to the collection agent. The frame's contents (frame type, file ID and line number)
// are appended to newTrace.
Symbolize(symbolReporter reporter.SymbolReporter, frame *host.Frame,
trace *libpf.Trace) error

// GetAndResetMetrics collects the metrics from the Instance and resets
// the counters to their initial value.
GetAndResetMetrics() ([]metrics.Metric, error)
}
Loading
Loading