From 71c60becfcb405c8f3ec26642bb9dc00723cebc7 Mon Sep 17 00:00:00 2001 From: Tommy Reilly Date: Mon, 8 Sep 2025 15:36:18 -0400 Subject: [PATCH 1/3] Revert "Introduce observer abstraction pattern" This reverts commit 9f859b88e19bba20b48e322554823adff69697e1. --- interpreter/instancestubs.go | 28 +++---- interpreter/types.go | 24 ++---- processmanager/execinfomanager/manager.go | 53 +------------ processmanager/manager.go | 2 - processmanager/processinfo.go | 96 +---------------------- processmanager/types.go | 7 -- 6 files changed, 20 insertions(+), 190 deletions(-) diff --git a/interpreter/instancestubs.go b/interpreter/instancestubs.go index 62f1373b9..6fb415c60 100644 --- a/interpreter/instancestubs.go +++ b/interpreter/instancestubs.go @@ -12,34 +12,24 @@ import ( "go.opentelemetry.io/ebpf-profiler/tpbase" ) -// ObserverStubs provides empty implementations of Observer hooks that are -// not mandatory for an Observer implementation. -type ObserverStubs struct { -} - -func (os *ObserverStubs) Detach(EbpfHandler, libpf.PID) error { - return nil +// InstanceStubs provides empty implementations of Instance hooks that are +// not mandatory for a Instance implementation. +type InstanceStubs struct { } -func (os *ObserverStubs) SynchronizeMappings(EbpfHandler, reporter.SymbolReporter, process.Process, +func (is *InstanceStubs) SynchronizeMappings(EbpfHandler, reporter.SymbolReporter, process.Process, []process.Mapping) error { return nil } -func (os *ObserverStubs) UpdateTSDInfo(EbpfHandler, libpf.PID, tpbase.TSDInfo) error { +func (is *InstanceStubs) UpdateTSDInfo(EbpfHandler, libpf.PID, tpbase.TSDInfo) error { return nil } -func (os *ObserverStubs) GetAndResetMetrics() ([]metrics.Metric, error) { - return []metrics.Metric{}, nil -} - -// InstanceStubs provides empty implementations of Instance hooks that are -// not mandatory for a Instance implementation. -type InstanceStubs struct { - ObserverStubs -} - func (is *InstanceStubs) Symbolize(reporter.SymbolReporter, *host.Frame, *libpf.Trace) error { return ErrMismatchInterpreterType } + +func (is *InstanceStubs) GetAndResetMetrics() ([]metrics.Metric, error) { + return []metrics.Metric{}, nil +} diff --git a/interpreter/types.go b/interpreter/types.go index 732ab0405..2ee082882 100644 --- a/interpreter/types.go +++ b/interpreter/types.go @@ -130,16 +130,14 @@ type Data interface { Unload(ebpf EbpfHandler) } -// Observer is the base interface for observing per-PID data without symbolization. -// This interface is useful for components that need to observe processes without -// providing frame symbolization capabilities. -type Observer interface { +// Instance is the interface to operate on per-PID data. +type Instance interface { // Detach removes any information from the ebpf maps. The pid is given as argument so - // simple observers can use the global Data also as the Observer implementation. + // simple interpreters can use the global Data also as the Instance implementation. Detach(ebpf EbpfHandler, pid libpf.PID) error // SynchronizeMappings is called when the processmanager has reread process memory - // mappings. Observers not needing to process these events can simply ignore them + // mappings. Interpreters not needing to process these events can simply ignore them // by just returning a nil. SynchronizeMappings(ebpf EbpfHandler, symbolReporter reporter.SymbolReporter, pr process.Process, mappings []process.Mapping) error @@ -148,19 +146,13 @@ type Observer interface { // introspection data has been updated. UpdateTSDInfo(ebpf EbpfHandler, pid libpf.PID, info tpbase.TSDInfo) error - // GetAndResetMetrics collects the metrics from the Observer and resets - // the counters to their initial value. - GetAndResetMetrics() ([]metrics.Metric, error) -} - -// Instance is the interface to operate on per-PID data with symbolization support. -// It extends Observer with the ability to symbolize frames. -type Instance interface { - Observer - // Symbolize requests symbolization of the given frame, and dispatches this symbolization // to the collection agent. The frame's contents (frame type, file ID and line number) // are appended to newTrace. Symbolize(symbolReporter reporter.SymbolReporter, frame *host.Frame, trace *libpf.Trace) error + + // GetAndResetMetrics collects the metrics from the Instance and resets + // the counters to their initial value. + GetAndResetMetrics() ([]metrics.Metric, error) } diff --git a/processmanager/execinfomanager/manager.go b/processmanager/execinfomanager/manager.go index 78adcf89d..131a0e492 100644 --- a/processmanager/execinfomanager/manager.go +++ b/processmanager/execinfomanager/manager.go @@ -25,7 +25,6 @@ import ( "go.opentelemetry.io/ebpf-profiler/interpreter/hotspot" "go.opentelemetry.io/ebpf-profiler/interpreter/luajit" "go.opentelemetry.io/ebpf-profiler/interpreter/nodev8" - "go.opentelemetry.io/ebpf-profiler/interpreter/oomwatcher" "go.opentelemetry.io/ebpf-profiler/interpreter/perl" "go.opentelemetry.io/ebpf-profiler/interpreter/php" "go.opentelemetry.io/ebpf-profiler/interpreter/python" @@ -67,9 +66,6 @@ type ExecutableInfo struct { // instance belongs to was previously identified as an interpreter. Otherwise, // this field is nil. Data interpreter.Data - // Observers stores per-executable observer information. Multiple observers - // can be associated with a single executable. - Observers []interpreter.Data // TSDInfo stores TSD information if the executable is libc, otherwise nil. TSDInfo *tpbase.TSDInfo } @@ -148,12 +144,6 @@ func NewExecutableInfoManager( interpreterLoaders = append(interpreterLoaders, customlabels.Loader) } - // Initialize observer loaders - observerLoaders := []interpreter.Loader{ - oomwatcher.Loader, - // Additional observers can be added here - } - deferredFileIDs, err := lru.NewSynced[host.FileID, libpf.Void](deferredFileIDSize, func(id host.FileID) uint32 { return uint32(id) }) if err != nil { @@ -165,7 +155,6 @@ func NewExecutableInfoManager( sdp: sdp, state: xsync.NewRWMutex(executableInfoManagerState{ interpreterLoaders: interpreterLoaders, - observerLoaders: observerLoaders, executables: map[host.FileID]*entry{}, unwindInfoIndex: map[sdtypes.UnwindInfo]uint16{}, ebpf: ebpf, @@ -241,9 +230,8 @@ func (mgr *ExecutableInfoManager) AddOrIncRef(fileID host.FileID, // Insert a corresponding record into our map. info = &entry{ ExecutableInfo: ExecutableInfo{ - Data: state.detectAndLoadInterpData(loaderInfo), - Observers: state.detectAndLoadObservers(loaderInfo), - TSDInfo: tsdInfo, + Data: state.detectAndLoadInterpData(loaderInfo), + TSDInfo: tsdInfo, }, mapRef: ref, rc: 1, @@ -342,11 +330,6 @@ type executableInfoManagerState struct { // for loading the host agent support for a specific interpreter type. interpreterLoaders []interpreter.Loader - // observerLoaders is a list of instances of an interface that provide functionality - // for loading observers for executables. Unlike interpreters, multiple observers - // can be associated with a single executable. - observerLoaders []interpreter.Loader - // ebpf provides the interface to manipulate eBPF maps. ebpf pmebpf.EbpfHandler @@ -397,38 +380,6 @@ func (state *executableInfoManagerState) detectAndLoadInterpData( return nil } -// detectAndLoadObservers attempts to detect observers for the given executable. -// Unlike interpreters, multiple observers can be loaded for a single executable. -func (state *executableInfoManagerState) detectAndLoadObservers( - loaderInfo *interpreter.LoaderInfo) []interpreter.Data { - var observers []interpreter.Data - - // Ask all observer loaders to check this executable - for _, loader := range state.observerLoaders { - data, err := loader(state.ebpf, loaderInfo) - if err != nil { - if errors.Is(err, os.ErrNotExist) { - // Very common if the process exited when we tried to analyze it. - log.Debugf("Failed to load observer for %v (%#016x): file not found", - loaderInfo.FileName(), loaderInfo.FileID()) - } else { - log.Errorf("Loader for %v (%#016x) failed: %v", - loaderInfo.FileName(), loaderInfo.FileID(), err) - } - continue - } - - // All observers return a data instance (possibly no-op) - if data != nil { - log.Debugf("Observer data %v for %v (%#016x)", - data, loaderInfo.FileName(), loaderInfo.FileID()) - observers = append(observers, data) - } - } - - return observers -} - // loadDeltas converts the sdtypes.StackDelta to StackDeltaEBPF and passes that to // the ebpf interface to be loaded to kernel maps. While converting the deltas, it // also creates a list of all large gaps in the executable. diff --git a/processmanager/manager.go b/processmanager/manager.go index 596b09c21..cc6b79232 100644 --- a/processmanager/manager.go +++ b/processmanager/manager.go @@ -89,13 +89,11 @@ func New(ctx context.Context, includeTracers types.IncludedTracers, monitorInter } interpreters := make(map[libpf.PID]map[util.OnDiskFileIdentifier]interpreter.Instance) - observers := make(map[libpf.PID]map[util.OnDiskFileIdentifier][]interpreter.Observer) pm := &ProcessManager{ interpreterTracerEnabled: em.NumInterpreterLoaders() > 0, eim: em, interpreters: interpreters, - observers: observers, exitEvents: make(map[libpf.PID]times.KTime), pidToProcessInfo: make(map[libpf.PID]*processInfo), ebpf: ebpf, diff --git a/processmanager/processinfo.go b/processmanager/processinfo.go index 825374b69..4a4c0d5a0 100644 --- a/processmanager/processinfo.go +++ b/processmanager/processinfo.go @@ -283,60 +283,6 @@ func (pm *ProcessManager) handleNewInterpreter(pr process.Process, m *Mapping, return nil } -// handleNewObservers processes observers for a new executable mapping. -// Unlike interpreters where only one can handle an executable, multiple observers -// can be attached to the same executable. -// -// The caller is responsible to hold the ProcessManager lock to avoid race conditions. -func (pm *ProcessManager) handleNewObservers(pr process.Process, m *Mapping, - ei *eim.ExecutableInfo) { - if len(ei.Observers) == 0 { - return - } - - pid := pr.PID() - key := m.GetOnDiskFileIdentifier() - - // Initialize observer map for this PID if needed - if _, ok := pm.observers[pid]; !ok { - pm.observers[pid] = make(map[util.OnDiskFileIdentifier][]interpreter.Observer) - } - - // Check if observers are already attached for this key - if _, ok := pm.observers[pid][key]; ok { - // Already handled - return - } - - // Attach all observers - var observers []interpreter.Observer //nolint:prealloc - for _, observerData := range ei.Observers { - observer, err := observerData.Attach(pm.ebpf, pid, libpf.Address(m.Bias), - pr.GetRemoteMemory()) - if err != nil { - log.Errorf("Failed to attach observer %v to PID %v: %v", observerData, pid, err) - continue - } - if observer == nil { - continue - } - - log.Debugf("Attached observer %v to PID %v", observerData, pid) - observers = append(observers, observer) - - // Update TSD info if available - if tsdInfo := pm.getTSDInfo(pid); tsdInfo != nil { - if err := observer.UpdateTSDInfo(pm.ebpf, pid, *tsdInfo); err != nil { - log.Errorf("Failed to update observer TSDInfo for PID %v: %v", pid, err) - } - } - } - - if len(observers) > 0 { - pm.observers[pid][key] = observers - } -} - // handleNewMapping processes new file backed mappings func (pm *ProcessManager) handleNewMapping(pr process.Process, m *Mapping, elfRef *pfelf.Reference) error { @@ -362,16 +308,10 @@ func (pm *ProcessManager) handleNewMapping(pr process.Process, m *Mapping, pm.assignTSDInfo(pid, ei.TSDInfo) - // Handle interpreter if present if ei.Data != nil { - if err := pm.handleNewInterpreter(pr, m, &ei); err != nil { - return err - } + return pm.handleNewInterpreter(pr, m, &ei) } - // Handle observers if present - pm.handleNewObservers(pr, m, &ei) - return nil } @@ -545,26 +485,6 @@ func (pm *ProcessManager) processRemovedMappings(pid libpf.PID, mappings []libpf // remove the entry. delete(pm.interpreters, pid) } - - // Clean up observers that are no longer valid - for key, observerList := range pm.observers[pid] { - if _, ok := interpretersValid[key]; ok { - continue - } - for _, observer := range observerList { - if err := observer.Detach(pm.ebpf, pid); err != nil { - log.Errorf("Failed to unload observer for PID %d: %v", - pid, err) - } - } - delete(pm.observers[pid], key) - } - - if len(pm.observers[pid]) == 0 { - // There are no longer any mapped observers in the process, therefore we can - // remove the entry. - delete(pm.observers, pid) - } } // synchronizeMappings synchronizes executable mappings for the given PID. @@ -859,7 +779,6 @@ func (pm *ProcessManager) ProcessedUntil(traceCaptureKTime times.KTime) { log.Debugf("PID %v deleted", pid) delete(pm.pidToProcessInfo, pid) - // Detach all interpreters for this PID for _, instance := range pm.interpreters[pid] { if err2 := instance.Detach(pm.ebpf, pid); err2 != nil { err = errors.Join(err, @@ -868,19 +787,6 @@ func (pm *ProcessManager) ProcessedUntil(traceCaptureKTime times.KTime) { } } delete(pm.interpreters, pid) - - // Detach all observers for this PID - for _, observerList := range pm.observers[pid] { - for _, observer := range observerList { - if err2 := observer.Detach(pm.ebpf, pid); err2 != nil { - err = errors.Join(err, - fmt.Errorf("failed to handle observer process exit for PID %d: %v", - pid, err2)) - } - } - } - delete(pm.observers, pid) - delete(pm.exitEvents, pid) log.Debugf("PID %v exit latency %v ms", pid, (nowKTime-pidExitKTime)/1e6) } diff --git a/processmanager/types.go b/processmanager/types.go index f70075b95..9807aaae8 100644 --- a/processmanager/types.go +++ b/processmanager/types.go @@ -50,13 +50,6 @@ type ProcessManager struct { // the unique on-disk identifier of the interpreter DSO. interpreters map[libpf.PID]map[util.OnDiskFileIdentifier]interpreter.Instance - // observers records the interpreter.Observer interface which contains hooks for - // process monitoring without symbolization. Unlike interpreters, multiple observers - // can be associated with each executable. - // The key of the first map is a process ID, while the key of the second map is - // the unique on-disk identifier of the observed DSO. - observers map[libpf.PID]map[util.OnDiskFileIdentifier][]interpreter.Observer - // pidToProcessInfo keeps track of the executable memory mappings. pidToProcessInfo map[libpf.PID]*processInfo From f8aab488cde0325491dcec9aeaee75e21c5d3302 Mon Sep 17 00:00:00 2001 From: Tommy Reilly Date: Tue, 19 Aug 2025 12:20:44 -0400 Subject: [PATCH 2/3] interpreter: Support multiple interpreters for single ELF object (#702) Co-authored-by: Christos Kalkanis --- .../integrationtests/golabels_test.go | 159 ++++++++++++++++++ interpreter/multi.go | 143 ++++++++++++++++ processmanager/execinfomanager/manager.go | 22 ++- processmanager/manager.go | 14 +- 4 files changed, 326 insertions(+), 12 deletions(-) create mode 100644 interpreter/golabels/integrationtests/golabels_test.go create mode 100644 interpreter/multi.go diff --git a/interpreter/golabels/integrationtests/golabels_test.go b/interpreter/golabels/integrationtests/golabels_test.go new file mode 100644 index 000000000..ae5c1a33f --- /dev/null +++ b/interpreter/golabels/integrationtests/golabels_test.go @@ -0,0 +1,159 @@ +//go:build integration + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package integrationtests + +import ( + "context" + "math" + "math/rand" + "os" + "runtime/debug" + "runtime/pprof" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/ebpf-profiler/host" + "go.opentelemetry.io/ebpf-profiler/libpf" + "go.opentelemetry.io/ebpf-profiler/reporter" + "go.opentelemetry.io/ebpf-profiler/tracer" + tracertypes "go.opentelemetry.io/ebpf-profiler/tracer/types" +) + +type mockIntervals struct{} + +func (mockIntervals) MonitorInterval() time.Duration { return 1 * time.Second } +func (mockIntervals) TracePollInterval() time.Duration { return 250 * time.Millisecond } +func (mockIntervals) PIDCleanupInterval() time.Duration { return 1 * time.Second } + +type mockReporter struct{} + +func (mockReporter) ExecutableKnown(_ libpf.FileID) bool { return true } +func (mockReporter) ExecutableMetadata(_ *reporter.ExecutableMetadataArgs) {} + +func isRoot() bool { + return os.Geteuid() == 0 +} + +//nolint:gosec +func randomString(n int) string { + letters := []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") + s := make([]rune, n) + for i := range s { + s[i] = letters[rand.Intn(len(letters))] + } + return string(s) +} + +func setPprofLabels(t *testing.T, ctx context.Context, cookie string, busyFunc func()) { + t.Helper() + labels := pprof.Labels( + "l1"+cookie, "label1"+randomString(16), + "l2"+cookie, "label2"+randomString(24), + "l3"+cookie, "label3"+randomString(48)) + lastUpdate := time.Now() + pprof.Do(context.TODO(), labels, func(context.Context) { + for time.Since(lastUpdate) < 10*time.Second { + // CPU go burr on purpose. + busyFunc() + if ctx.Err() != nil { + return + } + } + }) +} + +func Test_Golabels(t *testing.T) { + if !isRoot() { + t.Skip("root privileges required") + } + + buildInfo, ok := debug.ReadBuildInfo() + if !ok { + t.Fatalf("Failed to get build info") + } + + withCGO := false + for _, setting := range buildInfo.Settings { + if setting.Key == "CGO_ENABLED" { + withCGO = true + } + } + t.Logf("CGo is enabled: %t", withCGO) + + cookie := buildInfo.GoVersion + + t.Run(cookie, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + enabledTracers, _ := tracertypes.Parse("") + enabledTracers.Enable(tracertypes.Labels) + enabledTracers.Enable(tracertypes.GoTracer) + + trc, err := tracer.NewTracer(ctx, &tracer.Config{ + Reporter: &mockReporter{}, + Intervals: &mockIntervals{}, + IncludeTracers: enabledTracers, + SamplesPerSecond: 20, + ProbabilisticInterval: 100, + ProbabilisticThreshold: 100, + OffCPUThreshold: uint32(math.MaxUint32 / 100), + VerboseMode: true, + }) + require.NoError(t, err) + + trc.StartPIDEventProcessor(ctx) + + err = trc.AttachTracer() + require.NoError(t, err) + + t.Log("Attached tracer program") + + err = trc.EnableProfiling() + require.NoError(t, err) + + err = trc.AttachSchedMonitor() + require.NoError(t, err) + + traceCh := make(chan *host.Trace) + + err = trc.StartMapMonitors(ctx, traceCh) + require.NoError(t, err) + + go setPprofLabels(t, ctx, cookie, busyFunc) + + for trace := range traceCh { + if trace == nil { + continue + } + if len(trace.CustomLabels) > 0 { + hits := 0 + for k, v := range trace.CustomLabels { + switch k { + case "l1" + cookie: + require.Len(t, v, 22) + require.True(t, strings.HasPrefix(v, "label1")) + hits |= (1 << 0) + case "l2" + cookie: + require.Len(t, v, 30) + require.True(t, strings.HasPrefix(v, "label2")) + hits |= (1 << 1) + case "l3" + cookie: + require.Len(t, v, 47) + require.True(t, strings.HasPrefix(v, "label3")) + hits |= (1 << 2) + } + } + if hits == (1<<0 | 1<<1 | 1<<2) { + cancel() + break + } + } + } + }) +} diff --git a/interpreter/multi.go b/interpreter/multi.go new file mode 100644 index 000000000..54fc57eec --- /dev/null +++ b/interpreter/multi.go @@ -0,0 +1,143 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package interpreter // import "go.opentelemetry.io/ebpf-profiler/interpreter" + +import ( + "errors" + + log "github.com/sirupsen/logrus" + "go.opentelemetry.io/ebpf-profiler/host" + "go.opentelemetry.io/ebpf-profiler/libpf" + "go.opentelemetry.io/ebpf-profiler/metrics" + "go.opentelemetry.io/ebpf-profiler/process" + "go.opentelemetry.io/ebpf-profiler/remotememory" + "go.opentelemetry.io/ebpf-profiler/reporter" + "go.opentelemetry.io/ebpf-profiler/tpbase" +) + +// MultiData implements the Data interface for multiple interpreters. +type MultiData struct { + interpreters []Data +} + +// NewMultiData creates a new MultiData instance from multiple Data instances. +func NewMultiData(interpreters []Data) *MultiData { + return &MultiData{ + interpreters: interpreters, + } +} + +// Attach attaches all interpreters and returns a MultiInstance. +func (m *MultiData) Attach(ebpf EbpfHandler, pid libpf.PID, bias libpf.Address, + rm remotememory.RemoteMemory) (Instance, error) { + var instances []Instance + var errs []error + + for _, data := range m.interpreters { + instance, err := data.Attach(ebpf, pid, bias, rm) + if err != nil { + errs = append(errs, err) + continue + } + if instance != nil { + instances = append(instances, instance) + } + } + + err := errors.Join(errs...) + if len(instances) == 0 { + // Either all interpreters returned nil instances without error (e.g., not ready yet) + // in which case return nil, nil (valid state) otherwise return combined error. + return nil, err + } + + // We got at least one valid instance, log any errors that occurred + if err != nil { + log.Errorf("Errors occurred while attaching interpreters: %v", err) + } + + return NewMultiInstance(instances), nil +} + +// Unload unloads all interpreters. +func (m *MultiData) Unload(ebpf EbpfHandler) { + for _, data := range m.interpreters { + data.Unload(ebpf) + } +} + +// MultiInstance implements the Instance interface for multiple interpreters. +type MultiInstance struct { + instances []Instance +} + +// NewMultiInstance creates a new MultiInstance from multiple Instance instances. +func NewMultiInstance(instances []Instance) *MultiInstance { + return &MultiInstance{ + instances: instances, + } +} + +// Detach detaches all interpreter instances. +func (m *MultiInstance) Detach(ebpf EbpfHandler, pid libpf.PID) error { + var errs []error + for _, instance := range m.instances { + if err := instance.Detach(ebpf, pid); err != nil { + errs = append(errs, err) + } + } + return errors.Join(errs...) +} + +// SynchronizeMappings synchronizes mappings for all interpreter instances. +func (m *MultiInstance) SynchronizeMappings(ebpf EbpfHandler, + symbolReporter reporter.SymbolReporter, pr process.Process, mappings []process.Mapping) error { + var errs []error + for _, instance := range m.instances { + if err := instance.SynchronizeMappings(ebpf, symbolReporter, pr, mappings); err != nil { + errs = append(errs, err) + } + } + return errors.Join(errs...) +} + +// UpdateTSDInfo updates TSD info for all interpreter instances. +func (m *MultiInstance) UpdateTSDInfo(ebpf EbpfHandler, pid libpf.PID, info tpbase.TSDInfo) error { + var errs []error + for _, instance := range m.instances { + if err := instance.UpdateTSDInfo(ebpf, pid, info); err != nil { + errs = append(errs, err) + } + } + return errors.Join(errs...) +} + +// Symbolize tries to symbolize the frame with each interpreter instance until one succeeds. +func (m *MultiInstance) Symbolize(ebpfFrame *host.Frame, frames *libpf.Frames) error { + // Try each interpreter in order + for _, instance := range m.instances { + err := instance.Symbolize(ebpfFrame, frames) + if err != ErrMismatchInterpreterType { + return err + } + } + return ErrMismatchInterpreterType +} + +// GetAndResetMetrics collects metrics from all interpreter instances. +func (m *MultiInstance) GetAndResetMetrics() ([]metrics.Metric, error) { + var allMetrics []metrics.Metric + var errs []error + + for _, instance := range m.instances { + metrics, err := instance.GetAndResetMetrics() + if err != nil { + errs = append(errs, err) + continue + } + allMetrics = append(allMetrics, metrics...) + } + + return allMetrics, errors.Join(errs...) +} diff --git a/processmanager/execinfomanager/manager.go b/processmanager/execinfomanager/manager.go index 131a0e492..62dbf737a 100644 --- a/processmanager/execinfomanager/manager.go +++ b/processmanager/execinfomanager/manager.go @@ -351,9 +351,11 @@ type executableInfoManagerState struct { // detectAndLoadInterpData attempts to detect the given executable as an interpreter. If detection // succeeds, it then loads additional per-interpreter data into the BPF maps and returns the -// interpreter data. +// interpreter data. If multiple loaders recognize the executable, it returns a MultiData instance. func (state *executableInfoManagerState) detectAndLoadInterpData( loaderInfo *interpreter.LoaderInfo) interpreter.Data { + var interpreterDatas []interpreter.Data //nolint:prealloc + // Ask all interpreter loaders whether they want to handle this executable. for _, loader := range state.interpreterLoaders { data, err := loader(state.ebpf, loaderInfo) @@ -366,7 +368,8 @@ func (state *executableInfoManagerState) detectAndLoadInterpData( log.Errorf("Failed to load %v (%#016x): %v", loaderInfo.FileName(), loaderInfo.FileID(), err) } - return nil + // Continue checking other loaders even if one fails + continue } if data == nil { continue @@ -374,10 +377,21 @@ func (state *executableInfoManagerState) detectAndLoadInterpData( log.Debugf("Interpreter data %v for %v (%#016x)", data, loaderInfo.FileName(), loaderInfo.FileID()) - return data + interpreterDatas = append(interpreterDatas, data) } - return nil + // Return based on how many interpreters matched + switch len(interpreterDatas) { + case 0: + return nil + case 1: + return interpreterDatas[0] + default: + // Multiple interpreters matched, create a MultiData + log.Debugf("Multiple interpreters (%d) matched for %v (%#016x)", + len(interpreterDatas), loaderInfo.FileName(), loaderInfo.FileID()) + return interpreter.NewMultiData(interpreterDatas) + } } // loadDeltas converts the sdtypes.StackDelta to StackDeltaEBPF and passes that to diff --git a/processmanager/manager.go b/processmanager/manager.go index cc6b79232..60ac27277 100644 --- a/processmanager/manager.go +++ b/processmanager/manager.go @@ -119,20 +119,18 @@ func metricSummaryToSlice(summary metrics.Summary) []metrics.Metric { return result } -// updateMetricSummary gets the metrics from the provided interpreter instance and updaates the +// updateMetricSummary gets the metrics from the provided interpreter instance and updates the // provided summary by aggregating the new metrics into the summary. // The caller is responsible to hold the lock on the interpreter.Instance to avoid race conditions. func updateMetricSummary(ii interpreter.Instance, summary metrics.Summary) error { instanceMetrics, err := ii.GetAndResetMetrics() - if err != nil { - return err - } - + // Update metrics even if there was an error, because it's possible ii is a MultiInstance + // and some of the instances may have returned metrics. for _, metric := range instanceMetrics { summary[metric.ID] += metric.Value } - return nil + return err } // collectInterpreterMetrics starts a goroutine that periodically fetches and reports interpreter @@ -146,8 +144,8 @@ func collectInterpreterMetrics(ctx context.Context, pm *ProcessManager, summary := make(map[metrics.MetricID]metrics.MetricValue) for pid := range pm.interpreters { - for addr := range pm.interpreters[pid] { - if err := updateMetricSummary(pm.interpreters[pid][addr], summary); err != nil { + for addr, ii := range pm.interpreters[pid] { + if err := updateMetricSummary(ii, summary); err != nil { log.Errorf("Failed to get/reset metrics for PID %d at 0x%x: %v", pid, addr, err) } From 227b79dae419c55a12b1973842bf912aab389c58 Mon Sep 17 00:00:00 2001 From: Tommy Reilly Date: Mon, 8 Sep 2025 16:03:49 -0400 Subject: [PATCH 3/3] Convert oomwatcher to normal interpreter --- interpreter/multi.go | 5 +++-- processmanager/execinfomanager/manager.go | 2 ++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/interpreter/multi.go b/interpreter/multi.go index 54fc57eec..aea1a02ef 100644 --- a/interpreter/multi.go +++ b/interpreter/multi.go @@ -114,10 +114,11 @@ func (m *MultiInstance) UpdateTSDInfo(ebpf EbpfHandler, pid libpf.PID, info tpba } // Symbolize tries to symbolize the frame with each interpreter instance until one succeeds. -func (m *MultiInstance) Symbolize(ebpfFrame *host.Frame, frames *libpf.Frames) error { +func (m *MultiInstance) Symbolize(symbolReporter reporter.SymbolReporter, frame *host.Frame, + trace *libpf.Trace) error { // Try each interpreter in order for _, instance := range m.instances { - err := instance.Symbolize(ebpfFrame, frames) + err := instance.Symbolize(symbolReporter, frame, trace) if err != ErrMismatchInterpreterType { return err } diff --git a/processmanager/execinfomanager/manager.go b/processmanager/execinfomanager/manager.go index 62dbf737a..ccca545ec 100644 --- a/processmanager/execinfomanager/manager.go +++ b/processmanager/execinfomanager/manager.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/ebpf-profiler/interpreter/hotspot" "go.opentelemetry.io/ebpf-profiler/interpreter/luajit" "go.opentelemetry.io/ebpf-profiler/interpreter/nodev8" + "go.opentelemetry.io/ebpf-profiler/interpreter/oomwatcher" "go.opentelemetry.io/ebpf-profiler/interpreter/perl" "go.opentelemetry.io/ebpf-profiler/interpreter/php" "go.opentelemetry.io/ebpf-profiler/interpreter/python" @@ -136,6 +137,7 @@ func NewExecutableInfoManager( interpreterLoaders = append(interpreterLoaders, luajit.Loader) } + interpreterLoaders = append(interpreterLoaders, oomwatcher.Loader) interpreterLoaders = append(interpreterLoaders, apmint.Loader) if includeTracers.Has(types.Labels) { interpreterLoaders = append(interpreterLoaders, golabels.Loader)