diff --git a/collector/internal/controller.go b/collector/internal/controller.go index d414ee5e0..b5597e29e 100644 --- a/collector/internal/controller.go +++ b/collector/internal/controller.go @@ -38,7 +38,6 @@ func NewController(cfg *controller.Config, ExecutablesCacheElements: 16384, // Next step: Calculate FramesCacheElements from numCores and samplingRate. FramesCacheElements: 131072, - CGroupCacheElements: 1024, SamplesPerSecond: cfg.SamplesPerSecond, }, nextConsumer) if err != nil { diff --git a/host/host.go b/host/host.go index 249e9dbf9..192830690 100644 --- a/host/host.go +++ b/host/host.go @@ -50,6 +50,7 @@ type Trace struct { Comm string ProcessName string ExecutablePath string + ContainerID string Frames []Frame Hash TraceHash KTime times.KTime diff --git a/libpf/cgroupv2.go b/libpf/cgroupv2.go deleted file mode 100644 index aa5d952f4..000000000 --- a/libpf/cgroupv2.go +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package libpf // import "go.opentelemetry.io/ebpf-profiler/libpf" - -import ( - "bufio" - "fmt" - "os" - "regexp" - - lru "github.com/elastic/go-freelru" - log "github.com/sirupsen/logrus" -) - -var ( - cgroupv2PathPattern = regexp.MustCompile(`0:.*?:(.*)`) -) - -// LookupCgroupv2 returns the cgroupv2 ID for pid. -func LookupCgroupv2(cgrouplru *lru.SyncedLRU[PID, string], pid PID) (string, error) { - id, ok := cgrouplru.Get(pid) - if ok { - return id, nil - } - - // Slow path - f, err := os.Open(fmt.Sprintf("/proc/%d/cgroup", pid)) - if err != nil { - return "", err - } - defer f.Close() - - var genericCgroupv2 string - scanner := bufio.NewScanner(f) - buf := make([]byte, 512) - // Providing a predefined buffer overrides the internal buffer that Scanner uses (4096 bytes). - // We can do that and also set a maximum allocation size on the following call. - // With a maximum of 4096 characters path in the kernel, 8192 should be fine here. We don't - // expect lines in /proc//cgroup to be longer than that. - scanner.Buffer(buf, 8192) - var pathParts []string - for scanner.Scan() { - line := scanner.Text() - pathParts = cgroupv2PathPattern.FindStringSubmatch(line) - if pathParts == nil { - log.Debugf("Could not extract cgroupv2 path from line: %s", line) - continue - } - genericCgroupv2 = pathParts[1] - break - } - - // Cache the cgroupv2 information. - // To avoid busy lookups, also empty cgroupv2 information is cached. - cgrouplru.Add(pid, genericCgroupv2) - - return genericCgroupv2, nil -} diff --git a/main.go b/main.go index 607096084..089d4d3ba 100644 --- a/main.go +++ b/main.go @@ -115,7 +115,6 @@ func mainWithExitCode() exitCode { ExecutablesCacheElements: 16384, // Next step: Calculate FramesCacheElements from numCores and samplingRate. FramesCacheElements: 131072, - CGroupCacheElements: 1024, SamplesPerSecond: cfg.SamplesPerSecond, }) if err != nil { diff --git a/processmanager/helpers.go b/processmanager/helpers.go index 6f4b70224..db3de2890 100644 --- a/processmanager/helpers.go +++ b/processmanager/helpers.go @@ -4,6 +4,12 @@ package processmanager // import "go.opentelemetry.io/ebpf-profiler/processmanager" import ( + "bufio" + "fmt" + "io" + "os" + "regexp" + lru "github.com/elastic/go-freelru" log "github.com/sirupsen/logrus" @@ -11,6 +17,11 @@ import ( "go.opentelemetry.io/ebpf-profiler/libpf" ) +//nolint:lll +var ( + cgroupv2ContainerIDPattern = regexp.MustCompile(`0:.*?:.*?([0-9a-fA-F]{64})(?:\.scope)?(?:/[a-z]+)?$`) +) + type lruFileIDMapper struct { cache *lru.SyncedLRU[host.FileID, libpf.FileID] } @@ -79,3 +90,36 @@ type FileIDMapper interface { // Set adds a mapping from the 64-bit file ID to the 128-bit file ID. Set(pre host.FileID, post libpf.FileID) } + +func parseContainerID(cgroupFile io.Reader) string { + scanner := bufio.NewScanner(cgroupFile) + buf := make([]byte, 512) + // Providing a predefined buffer overrides the internal buffer that Scanner uses (4096 bytes). + // We can do that and also set a maximum allocation size on the following call. + // With a maximum of 4096 characters path in the kernel, 8192 should be fine here. We don't + // expect lines in /proc//cgroup to be longer than that. + scanner.Buffer(buf, 8192) + var pathParts []string + for scanner.Scan() { + line := scanner.Text() + pathParts = cgroupv2ContainerIDPattern.FindStringSubmatch(line) + if pathParts == nil { + log.Debugf("Could not extract cgroupv2 path from line: %s", line) + continue + } + return pathParts[1] + } + + // No containerID could be extracted + return "" +} + +// extractContainerID returns the containerID for pid if cgroup v2 is used. +func extractContainerID(pid libpf.PID) (string, error) { + cgroupFile, err := os.Open(fmt.Sprintf("/proc/%d/cgroup", pid)) + if err != nil { + return "", err + } + + return parseContainerID(cgroupFile), nil +} diff --git a/processmanager/helpers_test.go b/processmanager/helpers_test.go new file mode 100644 index 000000000..8510ca96d --- /dev/null +++ b/processmanager/helpers_test.go @@ -0,0 +1,56 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package processmanager // import "go.opentelemetry.io/ebpf-profiler/processmanager" + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" +) + +//nolint:lll +func TestExtractContainerID(t *testing.T) { + tests := []struct { + line string + expectedContainerID string + }{ + { + line: "0::/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-podf6f2d169_f2ae_4afa-95ed_06ff2ed6b288.slice/cri-containerd-b4d6d161c62525d726fa394b27df30e14f8ea5646313ada576b390de70cfc8cc.scope", + expectedContainerID: "b4d6d161c62525d726fa394b27df30e14f8ea5646313ada576b390de70cfc8cc", + }, + { + line: "0::/kubepods/besteffort/pod05e102bf-8744-4942-a241-9b6f07983a53/f52a212505a606972cf8614c3cb856539e71b77ecae33436c5ac442232fbacf8", + expectedContainerID: "f52a212505a606972cf8614c3cb856539e71b77ecae33436c5ac442232fbacf8", + }, + { + line: "0::/kubepods/besteffort/pod897277d4-5e6f-4999-a976-b8340e8d075e/crio-a4d6b686848a610472a2eed3ae20d4d64b6b4819feb9fdfc7fd7854deaf59ef3", + expectedContainerID: "a4d6b686848a610472a2eed3ae20d4d64b6b4819feb9fdfc7fd7854deaf59ef3", + }, + { + line: "0::/kubepods.slice/kubepods-besteffort.slice/kubepods-besteffort-pod4c9f1974_5c46_44c2_b42f_3bbf0e98eef9.slice/cri-containerd-bacb920470900725e0aa7d914fee5eb0854315448b024b6b8420ad8429c607ba.scope", + expectedContainerID: "bacb920470900725e0aa7d914fee5eb0854315448b024b6b8420ad8429c607ba", + }, + { + line: "0::/user.slice/user-1000.slice/user@1000.service/app.slice/app-org.gnome.Terminal.slice/vte-spawn-868f9513-eee8-457d-8e36-1b37ae8ae622.scope", + }, + { + line: "0::/../../user.slice/user-501.slice/session-3.scope", + }, + { + line: "0::/system.slice/docker-b1eba9dfaeba29d8b80532a574a03ea3cac29384327f339c26da13649e2120df.scope/init", + expectedContainerID: "b1eba9dfaeba29d8b80532a574a03ea3cac29384327f339c26da13649e2120df", + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.expectedContainerID, func(t *testing.T) { + reader := bytes.NewReader([]byte(tc.line)) + + gotContainerID := parseContainerID(reader) + assert.Equal(t, tc.expectedContainerID, gotContainerID) + }) + } +} diff --git a/processmanager/processinfo.go b/processmanager/processinfo.go index 84008bccc..4a4c0d5a0 100644 --- a/processmanager/processinfo.go +++ b/processmanager/processinfo.go @@ -145,10 +145,16 @@ func (pm *ProcessManager) updatePidInformation(pid libpf.PID, m *Mapping) (bool, } } + containerID, err := extractContainerID(pid) + if err != nil { + log.Debugf("Failed extracting containerID for %d: %v", pid, err) + } + info = &processInfo{ meta: ProcessMeta{ Name: processName, Executable: exePath, + ContainerID: containerID, EnvVariables: envVarMap}, mappings: make(map[libpf.Address]*Mapping), mappingsByFileID: make(map[host.FileID]map[libpf.Address]*Mapping), diff --git a/processmanager/types.go b/processmanager/types.go index b088752bf..9807aaae8 100644 --- a/processmanager/types.go +++ b/processmanager/types.go @@ -147,6 +147,8 @@ type ProcessMeta struct { Executable string // process env vars from /proc/PID/environ EnvVariables map[string]string + // container ID retrieved from /proc/PID/cgroup + ContainerID string } // processInfo contains information about the executable mappings diff --git a/reporter/base_reporter.go b/reporter/base_reporter.go index 364133ed1..baa017fd8 100644 --- a/reporter/base_reporter.go +++ b/reporter/base_reporter.go @@ -34,9 +34,6 @@ type baseReporter struct { // pdata holds the generator for the data being exported. pdata *pdata.Pdata - // cgroupv2ID caches PID to container ID information for cgroupv2 containers. - cgroupv2ID *lru.SyncedLRU[libpf.PID, string] - // traceEvents stores reported trace events (trace metadata with frames and counts) traceEvents xsync.RWMutex[samples.TraceEventsTree] @@ -96,12 +93,7 @@ func (b *baseReporter) ReportTraceEvent(trace *libpf.Trace, meta *samples.TraceE extraMeta = b.cfg.ExtraSampleAttrProd.CollectExtraSampleMeta(trace, meta) } - containerID, err := libpf.LookupCgroupv2(b.cgroupv2ID, meta.PID) - if err != nil { - log.Debugf("Failed to get a cgroupv2 ID as container ID for PID %d: %v", - meta.PID, err) - } - + containerID := meta.ContainerID key := samples.TraceAndMetaKey{ Hash: trace.Hash, Comm: meta.Comm, diff --git a/reporter/collector_reporter.go b/reporter/collector_reporter.go index 4ae5bc349..529fdaf27 100644 --- a/reporter/collector_reporter.go +++ b/reporter/collector_reporter.go @@ -5,7 +5,6 @@ package reporter // import "go.opentelemetry.io/ebpf-profiler/reporter" import ( "context" - "time" lru "github.com/elastic/go-freelru" log "github.com/sirupsen/logrus" @@ -29,14 +28,6 @@ type CollectorReporter struct { // NewCollector builds a new CollectorReporter func NewCollector(cfg *Config, nextConsumer xconsumer.Profiles) (*CollectorReporter, error) { - cgroupv2ID, err := lru.NewSynced[libpf.PID, string](cfg.CGroupCacheElements, - func(pid libpf.PID) uint32 { return uint32(pid) }) - if err != nil { - return nil, err - } - // Set a lifetime to reduce the risk of invalid data in case of PID reuse. - cgroupv2ID.SetLifetime(90 * time.Second) - // Next step: Dynamically configure the size of this LRU. // Currently, we use the length of the JSON array in // hostmetadata/hostmetadata.json. @@ -63,7 +54,6 @@ func NewCollector(cfg *Config, nextConsumer xconsumer.Profiles) (*CollectorRepor name: cfg.Name, version: cfg.Version, pdata: data, - cgroupv2ID: cgroupv2ID, traceEvents: xsync.NewRWMutex(tree), hostmetadata: hostmetadata, runLoop: &runLoop{ @@ -85,7 +75,6 @@ func (r *CollectorReporter) Start(ctx context.Context) error { }, func() { // Allow the GC to purge expired entries to avoid memory leaks. r.pdata.Purge() - r.cgroupv2ID.PurgeExpired() }) // When Stop() is called and a signal to 'stop' is received, then: diff --git a/reporter/collector_reporter_test.go b/reporter/collector_reporter_test.go index d5807d465..7a8b2d2e8 100644 --- a/reporter/collector_reporter_test.go +++ b/reporter/collector_reporter_test.go @@ -54,7 +54,6 @@ func TestCollectorReporterReportTraceEvent(t *testing.T) { r, err := NewCollector(&Config{ ExecutablesCacheElements: 1, FramesCacheElements: 1, - CGroupCacheElements: 1, }, next) require.NoError(t, err) if err := r.ReportTraceEvent(tt.trace, tt.meta); err != nil && diff --git a/reporter/config.go b/reporter/config.go index 0c13f47cf..ae05eb57d 100644 --- a/reporter/config.go +++ b/reporter/config.go @@ -30,8 +30,6 @@ type Config struct { ExecutablesCacheElements uint32 // FramesCacheElements defines the item capacity of the frames cache. FramesCacheElements uint32 - // CGroupCacheElements defines the item capacity of the cgroup cache. - CGroupCacheElements uint32 // samplesPerSecond defines the number of samples per second. SamplesPerSecond int diff --git a/reporter/otlp_reporter.go b/reporter/otlp_reporter.go index 2191c6583..ea7f5ba68 100644 --- a/reporter/otlp_reporter.go +++ b/reporter/otlp_reporter.go @@ -41,14 +41,6 @@ type OTLPReporter struct { // NewOTLP returns a new instance of OTLPReporter func NewOTLP(cfg *Config) (*OTLPReporter, error) { - cgroupv2ID, err := lru.NewSynced[libpf.PID, string](cfg.CGroupCacheElements, - func(pid libpf.PID) uint32 { return uint32(pid) }) - if err != nil { - return nil, err - } - // Set a lifetime to reduce risk of invalid data in case of PID reuse. - cgroupv2ID.SetLifetime(90 * time.Second) - // Next step: Dynamically configure the size of this LRU. // Currently, we use the length of the JSON array in // hostmetadata/hostmetadata.json. @@ -75,7 +67,6 @@ func NewOTLP(cfg *Config) (*OTLPReporter, error) { name: cfg.Name, version: cfg.Version, pdata: data, - cgroupv2ID: cgroupv2ID, traceEvents: xsync.NewRWMutex(eventsTree), hostmetadata: hostmetadata, runLoop: &runLoop{ @@ -110,7 +101,6 @@ func (r *OTLPReporter) Start(ctx context.Context) error { }, func() { // Allow the GC to purge expired entries to avoid memory leaks. r.pdata.Purge() - r.cgroupv2ID.PurgeExpired() }) // When Stop() is called and a signal to 'stop' is received, then: diff --git a/reporter/samples/samples.go b/reporter/samples/samples.go index 319b97d7c..c105fc230 100644 --- a/reporter/samples/samples.go +++ b/reporter/samples/samples.go @@ -11,6 +11,7 @@ type TraceEventMeta struct { ProcessName string ExecutablePath string APMServiceName string + ContainerID string PID, TID libpf.PID CPU int Origin libpf.Origin @@ -39,7 +40,7 @@ type TraceAndMetaKey struct { // comm and apmServiceName are provided by the eBPF programs Comm string ApmServiceName string - // containerID is annotated based on PID information + // ContainerID is annotated based on PID information ContainerID string Pid int64 Tid int64 diff --git a/tracehandler/tracehandler.go b/tracehandler/tracehandler.go index 58775c483..e89ae1f54 100644 --- a/tracehandler/tracehandler.go +++ b/tracehandler/tracehandler.go @@ -125,6 +125,7 @@ func (m *traceHandler) HandleTrace(bpfTrace *host.Trace) { CPU: bpfTrace.CPU, ProcessName: bpfTrace.ProcessName, ExecutablePath: bpfTrace.ExecutablePath, + ContainerID: bpfTrace.ContainerID, Origin: bpfTrace.Origin, OffTime: bpfTrace.OffTime, EnvVars: bpfTrace.EnvVars, diff --git a/tracer/tracer.go b/tracer/tracer.go index d89bf25e8..c43ef9634 100644 --- a/tracer/tracer.go +++ b/tracer/tracer.go @@ -871,6 +871,7 @@ func (t *Tracer) loadBpfTrace(raw []byte, cpu int) *host.Trace { trace := &host.Trace{ Comm: C.GoString((*C.char)(unsafe.Pointer(&ptr.comm))), ExecutablePath: procMeta.Executable, + ContainerID: procMeta.ContainerID, ProcessName: procMeta.Name, APMTraceID: *(*libpf.APMTraceID)(unsafe.Pointer(&ptr.apm_trace_id)), APMTransactionID: *(*libpf.APMTransactionID)(unsafe.Pointer(&ptr.apm_transaction_id)),